Skip to content

Commit

Permalink
Show file tree
Hide file tree
Showing 16 changed files with 1,033 additions and 146 deletions.
6 changes: 3 additions & 3 deletions policyengine_taxsim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .core.input_mapper import import_single_household
from .core.output_mapper import export_single_household
from .core.input_mapper import generate_household
from .core.output_mapper import export_household
from .cli import main as cli

__all__ = ["import_single_household", "export_single_household", "cli"]
__all__ = ["generate_household", "export_household", "cli"]

__version__ = "0.1.0" # Make sure this matches the version in pyproject.toml
12 changes: 6 additions & 6 deletions policyengine_taxsim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import pandas as pd
from pathlib import Path
try:
from .core.input_mapper import import_single_household
from .core.output_mapper import export_single_household
from .core.input_mapper import generate_household
from .core.output_mapper import export_household
except ImportError:
from policyengine_taxsim.core.input_mapper import import_single_household
from policyengine_taxsim.core.output_mapper import export_single_household
from policyengine_taxsim.core.input_mapper import generate_household
from policyengine_taxsim.core.output_mapper import export_household

@click.command()
@click.argument("input_file", type=click.Path(exists=True))
Expand All @@ -29,8 +29,8 @@ def main(input_file, output):
results = []
for _, row in df.iterrows():
taxsim_input = row.to_dict()
pe_situation = import_single_household(taxsim_input)
taxsim_output = export_single_household(taxsim_input, pe_situation)
pe_situation = generate_household(taxsim_input)
taxsim_output = export_household(taxsim_input, pe_situation)
results.append(taxsim_output)

# Create output dataframe and save to csv
Expand Down
33 changes: 27 additions & 6 deletions policyengine_taxsim/config/variable_mappings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ policyengine_to_taxsim:
- full: 2
- full_text: 5
fica:
variable: employee_social_security_tax
variable: use_placeholder_for_now_until_its_implemented_in_PE
implemented: true
idtl:
- standard : 0
- full: 2
- full_text: 5
variables:
- employee_social_security_tax
- employee_medicare_tax
- additional_medicare_tax
frate:
variable: not_implemented_in_pe
implemented: false
Expand Down Expand Up @@ -63,7 +67,7 @@ policyengine_to_taxsim:
- full: 2
- full_text: 5
v10:
variable: state_agi
variable: adjusted_gross_income
implemented: true
idtl:
- full: 2
Expand Down Expand Up @@ -105,7 +109,7 @@ policyengine_to_taxsim:
- full: 2
- full_text: 5
v17:
variable: taxable_income_deductions
variable: taxable_income_deductions_if_itemizing
implemented: true
idtl:
- full: 2
Expand Down Expand Up @@ -280,6 +284,23 @@ policyengine_to_taxsim:
- full_text: 5

taxsim_to_policyengine:
# This section would be the inverse of the above mapping
# It's left empty for brevity, but you should populate it
# with the inverse relationships for bidirectional conversion
household_situation:
families:
your family:
members: []
households:
your household:
members: []
state_name: {}
marital_units:
your marital unit:
members: []
people: {}
spm_units:
your household:
members: []
tax_units:
your tax unit:
members: []


106 changes: 90 additions & 16 deletions policyengine_taxsim/core/input_mapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,95 @@
from .utils import (
load_variable_mappings,
get_state_code,
get_state_code, get_ordinal,
)
import copy


def import_single_household(taxsim_vars):
def add_additional_tax_units(state, year, situation):
has_use_tax = ['pa', 'nc', 'ca', 'il', 'in', 'ok']
if state in has_use_tax:
situation["tax_units"]["your tax unit"][f"{state}_use_tax"] = {str(year): 0.0}
return situation


def form_household_situation(year, state, taxsim_vars):
mappings = load_variable_mappings()["taxsim_to_policyengine"]

household_situation = copy.deepcopy(mappings["household_situation"])

depx = taxsim_vars["depx"]
mstat = taxsim_vars["mstat"]

if mstat == 2: # Married filing jointly
members = ["you", "your partner"]
else: # Single, separate, or dependent taxpayer
members = ["you"]

for i in range(1, depx + 1):
members.append(f"your {get_ordinal(i)} dependent")

household_situation["families"]["your family"]["members"] = members
household_situation["households"]["your household"]["members"] = members
household_situation["tax_units"]["your tax unit"]["members"] = members

household_situation = add_additional_tax_units(state.lower(), year, household_situation)

household_situation["spm_units"]["your household"]["members"] = members

if depx > 0:
household_situation["marital_units"] = {
"your marital unit": {
"members": ["you", "your partner"] if mstat == 2 else ["you"]
}
}
for i in range(1, depx + 1):
dep_name = f"your {get_ordinal(i)} dependent"
household_situation["marital_units"][f"{dep_name}'s marital unit"] = {
"members": [dep_name],
"marital_unit_id": {str(year): i}
}
else:
household_situation["marital_units"]["your marital unit"]["members"] = (
["you", "your partner"] if mstat == 2 else ["you"]
)

household_situation["households"]["your household"]["state_name"][str(year)] = state

people = household_situation["people"]

people["you"] = {
"age": {str(year): int(taxsim_vars.get("page", 40))},
"employment_income": {str(year): float(taxsim_vars.get("pwages", 0))}
}

if mstat == 2:
people["your partner"] = {
"age": {str(year): int(taxsim_vars.get("sage", 40))},
"employment_income": {str(year): float(taxsim_vars.get("swages", 0))}
}

for i in range(1, depx + 1):
dep_name = f"your {get_ordinal(i)} dependent"
people[dep_name] = {
"age": {str(year): int(taxsim_vars.get(f"age{i}", 10))},
"employment_income": {str(year): 0}
}

return household_situation


def check_if_exists_or_set_defaults(taxsim_vars):
taxsim_vars["state"] = int(taxsim_vars.get("state",
44) or 44) # set TX (texas) as default is no state field has passed or passed as 0

taxsim_vars["depx"] = int(taxsim_vars.get("depx", 0) or 0)

taxsim_vars["mstat"] = int(taxsim_vars.get("mstat", 1) or 1)

return taxsim_vars


def generate_household(taxsim_vars):
"""
Convert TAXSIM input variables to a PolicyEngine situation.
Expand All @@ -14,24 +99,13 @@ def import_single_household(taxsim_vars):
Returns:
dict: PolicyEngine situation dictionary
"""
mappings = load_variable_mappings()["taxsim_to_policyengine"]

year = str(int(taxsim_vars["year"])) # Ensure year is an integer string

taxsim_vars["state"] = taxsim_vars.get("state", 44) or 44 #set TX texas as default is no state has passed or passed as 0
taxsim_vars = check_if_exists_or_set_defaults(taxsim_vars)

state = get_state_code(taxsim_vars["state"])

situation = {
"people": {
"you": {
"age": {year: int(taxsim_vars.get("page", 40))},
"employment_income": {year: int(taxsim_vars.get("pwages", 0))},
}
},
"households": {
"your household": {"members": ["you"], "state_name": {year: state}}
},
"tax_units": {"your tax unit": {"members": ["you"]}},
}
situation = form_household_situation(year, state, taxsim_vars)

return situation
15 changes: 13 additions & 2 deletions policyengine_taxsim/core/output_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from policyengine_us import Simulation


def export_single_household(taxsim_input, policyengine_situation):
def export_household(taxsim_input, policyengine_situation):
"""
Convert a PolicyEngine situation to TAXSIM output variables.
Expand All @@ -16,7 +16,7 @@ def export_single_household(taxsim_input, policyengine_situation):
dict: Dictionary of TAXSIM output variables
"""
mappings = load_variable_mappings()["policyengine_to_taxsim"]

print(policyengine_situation)
simulation = Simulation(situation=policyengine_situation)

year = list(
Expand All @@ -39,6 +39,9 @@ def export_single_household(taxsim_input, policyengine_situation):
taxsim_output[key] = int(year)
elif key == "state":
taxsim_output[key] = get_state_number(state_name)
elif key == "fica":
pe_variables = value['variables']
taxsim_output[key] = simulate_multiple(simulation, pe_variables, year)
else:
pe_variable = value['variable']

Expand All @@ -57,3 +60,11 @@ def simulate(simulation, variable, year):
return to_roundedup_number(simulation.calculate(variable, period=year))
except:
return 0.00


def simulate_multiple(simulation, variables, year):
try:
total = sum(to_roundedup_number(simulation.calculate(variable, period=year)) for variable in variables)
except:
total = 0.00
return to_roundedup_number(total)
16 changes: 16 additions & 0 deletions policyengine_taxsim/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,19 @@ def to_roundedup_number(value):
return round(value[0], 2)
else:
return round(value, 2)


def get_ordinal(n):
ordinals = {
1: "first",
2: "second",
3: "third",
4: "fourth",
5: "fifth",
6: "sixth",
7: "seventh",
8: "eighth",
9: "ninth",
10: "tenth"
}
return ordinals.get(n, f"{n}th")
65 changes: 65 additions & 0 deletions policyengine_taxsim/exe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import click
import pandas as pd
from pathlib import Path
import sys
import os


# Delay imports until runtime
def get_mappers():
from policyengine_taxsim.core.input_mapper import generate_household
from policyengine_taxsim.core.output_mapper import export_household
return generate_household, export_household


def get_yaml_path():
"""Get the path to YAML whether running as script or frozen executable"""
if getattr(sys, 'frozen', False):
# Running in a bundle
return os.path.join(sys._MEIPASS, "config", "variable_mappings.yaml")
else:
# Running in normal Python
return os.path.join(Path(__file__).parent, "config", "variable_mappings.yaml")


@click.command()
@click.argument("input_file", type=click.Path(exists=True))
@click.option(
"--output",
"-o",
type=click.Path(),
default="output.csv",
help="Output file path",
)
def main(input_file, output):
"""
Process TAXSIM input file and generate PolicyEngine-compatible output.
"""
try:
# Get mapper functions at runtime
import_single_household, export_single_household = get_mappers()

# Read input file
df = pd.read_csv(input_file)

# Process each row
results = []
for _, row in df.iterrows():
taxsim_input = row.to_dict()
pe_situation = import_single_household(taxsim_input)
taxsim_output = export_single_household(taxsim_input, pe_situation)
results.append(taxsim_output)

# Create output dataframe and save to csv
output_df = pd.DataFrame(results)
output_path = Path(output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_df.to_csv(output_path, index=False)
click.echo(f"Output saved to {output}")
except Exception as e:
click.echo(f"Error processing input: {str(e)}", err=True)
raise


if __name__ == "__main__":
main()
Loading

0 comments on commit 526ab08

Please sign in to comment.