Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding additional outputs wrapper #126

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mbs_results/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from mbs_results.estimation.estimate import estimate
from mbs_results.imputation.impute import impute
from mbs_results.outlier_detection.detect_outlier import detect_outlier
from mbs_results.outputs.produce_outputs import produce_outputs
from mbs_results.outputs.produce_additional_outputs import (
get_additional_outputs_df,
produce_additional_outputs,
)
from mbs_results.staging.stage_dataframe import stage_dataframe
from mbs_results.utilities.inputs import load_config
from mbs_results.utilities.validation_checks import (
Expand Down Expand Up @@ -32,7 +35,8 @@ def run_mbs_main():
outlier_output = detect_outlier(estimation_output, config)
validate_outlier_detection(outlier_output, config)

produce_outputs(outlier_output, "output_path/")
additional_outputs_df = get_additional_outputs_df(estimation_output, outlier_output)
produce_additional_outputs(config, additional_outputs_df)


if __name__ == "__main__":
Expand Down
47 changes: 34 additions & 13 deletions mbs_results/outputs/get_additional_outputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
def get_additional_outputs(config: dict, function_mapper: dict) -> None:
import pandas as pd


def get_additional_outputs(
config: dict, function_mapper: dict, additional_outputs_df: pd.DataFrame
) -> dict:
"""
Runs a set of functions as defined in additional_outputs from the config,
the function names must exist in function_mapper which also has the relevant
Expand Down Expand Up @@ -27,25 +32,37 @@ def get_additional_outputs(config: dict, function_mapper: dict) -> None:

Returns
-------
None
dict
Dictionary of additional outputs, with the keys being the names
of the outputs and the values being the outputs to be exported.

Examples
--------
>> example_function = print("Hello world)
>> config = {additional_outputs:["output_name"]}
>> function_mapper = {output_name : example_function}
>> get_additional_outputs(config,function_mapper)
>> example_function = print("Hello world")
>> config = {"additional_outputs" : ["output_name"]}
>> function_mapper = {"output_name" : example_function}
>> get_additional_outputs(config, function_mapper)
>>
>>
>> example_function = function(argA, argB)
>> config = {"additional_outputs" : ["example_output"],
>> "argA": "valueA",
>> "argB": "valueB"}
>> function_mapper = {"example_output" : example_function}
>> get_additional_outputs(config, function_mapper)

"""

additional_outputs = dict()

if not isinstance(config["additional_outputs"], list):

raise ValueError(
"""
In config file additional_outputs must be a list, please use:\n
["all"] to get all outputs\n
[] to get no outputs\n
or a list with the outputs, e.g. ["output_1","output_2"]
In config file additional_outputs must be a list, please use:\n
["all"] to get all outputs\n
[] to get no outputs\n
or a list with the outputs, e.g. ["output_1","output_2"]
"""
)

Expand All @@ -64,12 +81,16 @@ def get_additional_outputs(config: dict, function_mapper: dict) -> None:

if function in function_mapper:

function_mapper[function](**config)
additional_outputs[function] = function_mapper[function](
additional_outputs_df=additional_outputs_df, **config
)

else:
raise ValueError(
f"""
The function {function} is not registerd, check spelling.\n
Currently the registered functions are:\n {function_mapper}
The function {function} is not registered, check spelling.\n
Currently the registered functions are:\n {function_mapper}
"""
)

return additional_outputs
107 changes: 107 additions & 0 deletions mbs_results/outputs/produce_additional_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from importlib import metadata

import pandas as pd

from mbs_results.outputs.get_additional_outputs import get_additional_outputs
from mbs_results.outputs.selective_editing_contributer_output import (
get_selective_editing_contributer_output,
)
from mbs_results.outputs.selective_editing_question_output import (
create_selective_editing_question_output,
)
from mbs_results.outputs.turnover_analysis import create_turnover_output
from mbs_results.outputs.weighted_adj_val_time_series import (
get_weighted_adj_val_time_series,
)


def get_additional_outputs_df(
estimation_output: pd.DataFrame, outlier_output: pd.DataFrame
):
"""
Creating dataframe that contains all variables needed for producing additional
outputs.

Parameters
----------
estimation_output : pd.DataFrame
Dataframe output from the estimation stage of the pipeline
outlier_output : pd.DataFrame
Dataframe output from the outliering stage of the pipeline

Returns
-------
pd.DataFrame

"""

additional_outputs_df = estimation_output[
[
"reference",
"period",
"design_weight",
"frosic2007",
"formtype",
"questioncode",
"frotover",
"calibration_factor",
"adjustedresponse",
"status",
"response",
"froempment",
"cell_no",
"referencename",
"imputation_flags_adjustedresponse",
"f_link_adjustedresponse",
"b_link_adjustedresponse",
"construction_link",
]
]

additional_outputs_df = additional_outputs_df.merge(
outlier_output[["reference", "period", "questioncode", "outlier_weight"]],
how="left",
on=["reference", "period", "questioncode"],
)

return additional_outputs_df


def produce_additional_outputs(config: dict, additional_outputs_df: pd.DataFrame):
"""
Function to write additional outputs

Parameters
----------
config : Dict
main pipeline configuration
additional_outputs_df : pd.DataFrame
Dataframe to feed in as arguments for additional outputs

Returns
-------
None.
Outputs are written to output path defined in config

"""

additional_outputs = get_additional_outputs(
config,
{
"selective_editing_contributor": get_selective_editing_contributer_output,
"selective_editing_question": create_selective_editing_question_output,
"turnover_output": create_turnover_output,
"weighted_adj_val_time_series": get_weighted_adj_val_time_series,
},
additional_outputs_df,
)

# Stop function if no additional_outputs are listed in config.
if additional_outputs is None:
return

file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"]
snapshot_name = config["mbs_file_name"].split(".")[0]
for output in additional_outputs:
filename = f"{output}_v{file_version_mbs}_{snapshot_name}.csv"
additional_outputs[output].to_csv(config["output_path"] + filename)
7 changes: 0 additions & 7 deletions mbs_results/outputs/produce_outputs.py

This file was deleted.

43 changes: 19 additions & 24 deletions mbs_results/outputs/selective_editing_contributer_output.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import pandas as pd

from mbs_results.merge_domain import merge_domain
from mbs_results.staging.merge_domain import merge_domain


def get_selective_editing_contributer_output(
input_filepath: str,
domain_filepath: str,
additional_outputs_df: pd.DataFrame,
sic_domain_mapping_path: str,
threshold_filepath: str,
sic_input: str,
sic_mapping: str,
period_selected: int,
**config
) -> pd.DataFrame:
"""
Returns a dataframe containing period, reference, domain_group, and
Expand All @@ -18,17 +17,16 @@ def get_selective_editing_contributer_output(

Parameters
----------
input_filepath : str
Filepath to csv file containing reference, imp_class, period and
SIC columns.
domain_filepath : str
additional_outputs_df : pd.DataFrame
Dataframe containing reference, design_weight, formtype, period and SIC columns.
sic_domain_mapping_path : str
Filepath to csv file containing SIC and domain columns.
threshold_filepath : str
Filepath to csv file containing form type, domain and threshold columns.
sic_input : str
Name of column in input_filepath csv file containing SIC variable.
sic_mapping : str
Name of column in domain_filepath csv file containing SIC variable.
period_selected : int
period to include in outputs
**config: Dict
main pipeline configuration. Can be used to input the entire config dictionary

Returns
-------
Expand All @@ -41,32 +39,29 @@ def get_selective_editing_contributer_output(
>> input_filepath=input_filepath,
>> domain_filepath=domain_filepath,
>> threshold_filepath=threshold_filepath,
>> sic_input="sic_5_digit",
>> sic_mapping="sic_5_digit",
>> period_selected=202201
>> )
"""

input_data = pd.read_csv(
input_filepath,
usecols=["period", "reference", "design_weight", sic_input, "form_type"],
)
input_data = additional_outputs_df[
["period", "reference", "design_weight", "frosic2007", "formtype"]
]

domain_data = pd.read_csv(domain_filepath)
domain_data = pd.read_csv(sic_domain_mapping_path).astype(str)

threshold_mapping = pd.read_csv(threshold_filepath)
threshold_mapping = pd.read_csv(threshold_filepath).astype(str)

selective_editing_contributer_output = merge_domain(
input_data, domain_data, sic_input, sic_mapping
input_data, domain_data, "frosic2007", "sic_5_digit"
)

selective_editing_contributer_output = pd.merge(
selective_editing_contributer_output,
threshold_mapping,
left_on=["form_type", "domain"],
left_on=["formtype", "domain"],
right_on=["form", "domain"],
how="left",
).drop(columns=["form", "form_type"])
).drop(columns=["form", "formtype"])

selective_editing_contributer_output = selective_editing_contributer_output.rename(
columns={"reference": "ruref", "domain": "domain_group"}
Expand Down
Loading
Loading