Skip to content

Commit

Permalink
fix nans in process exchanges; see hotfix notes and changelog; addres…
Browse files Browse the repository at this point in the history
…ses USEPA#202
  • Loading branch information
dt-woods committed Dec 19, 2023
1 parent ebae901 commit 946463a
Showing 1 changed file with 111 additions and 71 deletions.
182 changes: 111 additions & 71 deletions electricitylci/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
)
from electricitylci.egrid_emissions_and_waste_by_facility import (
emissions_and_wastes_by_facility,
base_inventory,
)
import facilitymatcher.globals as fmglob # package under development

Expand All @@ -63,6 +62,7 @@
CHANGELOG
- Remove module logger.
- Remove unused imports.
- Add missing documentation to methods.
- Clean up formatting towards PEP8.
- Note: the uncertainty calculations in :func:`aggregate_data` are
Expand All @@ -72,9 +72,14 @@
unused.
- Replace .values with .squeeze().values when calling a data frame with
only one column of data in :func:`olcaschema_genprocess`.
Created: 2019-06-04
Last edited: 2023-11-16
- Fix groupby for source_db in :func:`calculate_electricity_by_source` to
match the filter used to find multiple source entries.
- Add empty database check in :func:`calculate_electricity_by_source`
Created:
2019-06-04
Last edited:
2023-12-18
"""
__all__ = [
"add_data_collection_score",
Expand Down Expand Up @@ -205,6 +210,9 @@ def aggregate_facility_flows(df):
then those show up as separate emissions in the inventory and artificially
inflate the number of emissions for uncertainty calculations.
This method sums all duplicated emissions together (taking the average of
their data quality indicators).
Parameters
----------
df : pandas.DataFrame
Expand All @@ -214,6 +222,8 @@ def aggregate_facility_flows(df):
Returns
-------
pandas.DataFrame
The same data frame sent with duplicated emissions aggregated to a
single row.
"""
emission_compartments = [
"emission/air",
Expand All @@ -237,7 +247,7 @@ def aggregate_facility_flows(df):
]

def wtd_mean(pdser, total_db, cols):
"""Perform a weighted average using 'FlowAmount' as the weight.
"""Perform a weighted-average of DQI using 'FlowAmount' as the weight.
Parameters
----------
Expand Down Expand Up @@ -438,8 +448,8 @@ def calculate_electricity_by_source(db, subregion="BA"):
"""
all_sources = '_'.join(sorted(list(db["Source"].unique())))
power_plant_criteria = db["stage_code"]=="Power plant"
db_powerplant = db.loc[power_plant_criteria, :]
db_nonpower = db.loc[~power_plant_criteria, :]
db_powerplant = db.loc[power_plant_criteria, :].copy()
db_nonpower = db.loc[~power_plant_criteria, :].copy()
region_agg = subregion_col(subregion)

fuel_agg = ["FuelCategory"]
Expand All @@ -459,81 +469,91 @@ def calculate_electricity_by_source(db, subregion="BA"):
]
elec_groupby_cols = fuel_agg + ["Year"]

combine_source_by_flow = lambda x: _combine_sources(
x, db, ["FlowName", "Compartment"], 1
)
combine_source_lambda = lambda x: _combine_sources(
x, db_multiple_sources, groupby_cols)

# This is a pretty expensive process when we have to start looking at each
# flow generated in each compartment for each balancing authority area.
# To speed this up, we group by FlowName and Compartment and look
# and try to eliminate flows where all sources are single entities.
source_df = pd.DataFrame()
source_df = pd.DataFrame(
db_powerplant.groupby(["FlowName", "Compartment"])[["Source"]].apply(
combine_source_by_flow
),
columns=["source_list"],
)
source_df[["source_list", "source_string"]] = pd.DataFrame(
source_df["source_list"].values.tolist(), index=source_df.index
)
source_df.reset_index(inplace=True)
old_index = db_powerplant.index
db_powerplant = db_powerplant.merge(
right=source_df,
left_on=["FlowName", "Compartment"],
right_on=["FlowName", "Compartment"],
how="left",
)
db_powerplant.index=old_index
db_multiple_sources = db_powerplant.loc[db_powerplant["source_string"].isna(), :]
if len(db_multiple_sources) > 0:
# HOTFIX: add check for empty powerplant data frame [2023-12-19; TWD]
if len(db_powerplant) == 0:
db_cols = list(db_powerplant.columns) + ['source_list', 'source_string']
db_powerplant = pd.DataFrame(columns=db_cols)
else:
# This is a pretty expensive process when we have to start looking
# at each flow generated in each compartment for each balancing
# authority area. To hopefully speed this up, we'll group by FlowName
# and Compartment and look and try to eliminate flows where all
# sources are single entities.
combine_source_by_flow = lambda x: _combine_sources(
x, db, ["FlowName", "Compartment"], 1
)
# Find all single-source flows (all multiple sources are nans)
source_df = pd.DataFrame(
db_multiple_sources.groupby(groupby_cols)[["Source"]].apply(
combine_source_lambda
),
db_powerplant.groupby(["FlowName", "Compartment"])[
["Source"]].apply(combine_source_by_flow),
columns=["source_list"],
)
source_df[["source_list", "source_string"]] = pd.DataFrame(
source_df["source_list"].values.tolist(), index=source_df.index
source_df["source_list"].values.tolist(),
index=source_df.index
)
source_df.reset_index(inplace=True)
db_multiple_sources.drop(
columns=["source_list", "source_string"], inplace=True
)
old_index = db_multiple_sources.index
db_multiple_sources = db_multiple_sources.merge(
old_index = db_powerplant.index
db_powerplant = db_powerplant.merge(
right=source_df,
left_on=groupby_cols,
right_on=groupby_cols,
left_on=["FlowName", "Compartment"],
right_on=["FlowName", "Compartment"],
how="left",
)
db_multiple_sources.index = old_index
db_powerplant.loc[
db_powerplant["source_string"].isna(), ["source_string", "source_list"]
] = db_multiple_sources[["source_string", "source_list"]]
db_powerplant.index = old_index

# Filter out single flows; leaving only multi-flows
db_multiple_sources = db_powerplant.loc[
db_powerplant["source_string"].isna(), :].copy()
if len(db_multiple_sources) > 0:
combine_source_lambda = lambda x: _combine_sources(
x, db_multiple_sources, groupby_cols
)
# HOTFIX: it doesn't make sense to groupby a different group;
# it gives different results from the first-pass filter;
# changed to match criteria above. [2023-12-19; TWD]
source_df = pd.DataFrame(
db_multiple_sources.groupby(["FlowName", "Compartment"])[
["Source"]].apply(combine_source_lambda),
columns=["source_list"],
)
source_df[["source_list", "source_string"]] = pd.DataFrame(
source_df["source_list"].values.tolist(),
index=source_df.index
)
source_df.reset_index(inplace=True)
db_multiple_sources.drop(
columns=["source_list", "source_string"], inplace=True
)
old_index = db_multiple_sources.index
db_multiple_sources = db_multiple_sources.merge(
right=source_df,
left_on=["FlowName", "Compartment"],
right_on=["FlowName", "Compartment"],
how="left",
)
db_multiple_sources.index = old_index
db_powerplant.loc[
db_powerplant["source_string"].isna(),
["source_string", "source_list"]
] = db_multiple_sources[["source_string", "source_list"]]
unique_source_lists = list(db_powerplant["source_string"].unique())
unique_source_lists = [
x for x in unique_source_lists if ((str(x) != "nan"))
]
unique_source_lists = [x for x in unique_source_lists if str(x) != "nan"]
unique_source_lists += [all_sources]
# One set of emissions passed into this routine may be life cycle emissions
# used as proxies for Canadian generation. In those cases the electricity
# generation will be equal to the Electricity already in the dataframe.
elec_sum_lists = list()
unique_source_lists = unique_source_lists+[all_sources]
for src in unique_source_lists:
logging.info(f"Calculating electricity for {src}")
# src_filter = db.apply(lambda x: x["Source"] in src, axis=1)
db["temp_src"] = src
src_filter = [
a in b
for a, b in zip(
db["Source"].values.tolist(), db["temp_src"].values.tolist()
)
]
sub_db = db.loc[src_filter, :]
sub_db = db.loc[src_filter, :].copy()
sub_db.drop_duplicates(subset=fuel_agg + ["eGRID_ID"], inplace=True)
sub_db_group = sub_db.groupby(elec_groupby_cols, as_index=False).agg(
{"Electricity": [np.sum, np.mean], "eGRID_ID": "count"}
Expand All @@ -545,8 +565,8 @@ def calculate_electricity_by_source(db, subregion="BA"):
]
sub_db_group["source_string"] = src
elec_sum_lists.append(sub_db_group)
db_nonpower["source_string"]=all_sources
db_nonpower["source_list"]=[all_sources]*len(db_nonpower)
db_nonpower["source_string"] = all_sources
db_nonpower["source_list"] = [all_sources]*len(db_nonpower)
elec_sums = pd.concat(elec_sum_lists, ignore_index=True)
elec_sums.sort_values(by=elec_groupby_cols, inplace=True)
db = pd.concat([db_powerplant, db_nonpower])
Expand Down Expand Up @@ -1104,6 +1124,7 @@ def _wtd_mean(pdser, total_db):
+ fuel_agg
+ ["stage_code", "FlowName", "Compartment", "FlowUUID","Unit"]
)
# NOTE: datatypes should be str, str, int, str
elec_df_groupby_cols = (
region_agg + fuel_agg + ["Year", "source_string"]
)
Expand All @@ -1115,6 +1136,7 @@ def _wtd_mean(pdser, total_db):
"FlowUUID",
"Unit"
]
# NOTE: datatypes should be str, int, str
elec_df_groupby_cols = fuel_agg + ["Year", "source_string"]

# Replace primary fuel categories based on EIA Form 923, if requested
Expand All @@ -1132,15 +1154,25 @@ def _wtd_mean(pdser, total_db):
not_all, "eGRID_ID"].map(key_df["FuelCategory"])

total_db["FlowUUID"] = total_db["FlowUUID"].fillna(value="dummy-uuid")
# Aggregate multiple emissions of the same type
logging.info("Aggregating multiples of plant emissions")
sz_tdb = len(total_db)
total_db = aggregate_facility_flows(total_db)
logging.debug("Reduce data from %d to %d rows" % (sz_tdb, len(total_db)))
# Calculate electricity totals by region and source
total_db, electricity_df = calculate_electricity_by_source(
total_db, subregion
)
total_db["FlowAmount"].replace(to_replace=0, value=1E-15, inplace=True)

# Inject a false flow amount for "uncertainty" calculations
false_gen = 1e-15
total_db["FlowAmount"].replace(to_replace=0, value=false_gen, inplace=True)
total_db = add_data_collection_score(total_db, electricity_df, subregion)
total_db["facility_emission_factor"] = (
total_db["FlowAmount"] / total_db["Electricity"]
)
# Effectively removes rows with zero Electricity or nan flow amounts;
# for 2016 generation, it's all nans in flow amounts.
total_db.dropna(subset=["facility_emission_factor"], inplace=True)

wm = lambda x: _wtd_mean(x, total_db)
Expand All @@ -1161,6 +1193,7 @@ def _wtd_mean(pdser, total_db):
"DataReliability": wm,
"facility_emission_factor": ["min", "max", geo_mean],
})

database_f3.columns = groupby_cols + [
"Year",
"source_string",
Expand All @@ -1180,13 +1213,16 @@ def _wtd_mean(pdser, total_db):
database_f3.loc[criteria, "uncertaintyLognormParams"] = None

# Merge electricity_sum, electricity_mean, and facility_count data
# HOTFIX: 'Year' must be integer in both dataframes [2023-12-18; TWD]
electricity_df['Year'] = electricity_df['Year'].astype(int)
database_f3['Year'] = database_f3['Year'].astype(int)
database_f3 = database_f3.merge(
right=electricity_df,
left_on=elec_df_groupby_cols,
right_on=elec_df_groupby_cols,
how="left",
on=elec_df_groupby_cols,
how="left"
)

# Fix Canada by importing 'Electricity' whilst maintaining the indexes
canadian_criteria = database_f3["FuelCategory"] == "ALL"
if region_agg:
canada_db = pd.merge(
Expand Down Expand Up @@ -1214,16 +1250,20 @@ def _wtd_mean(pdser, total_db):
database_f3.loc[canada_db.index, "electricity_sum"] = canada_db[
"Electricity"
]
# HOTFIX: Address ZeroDivideError [2023-12-18; TWD]
# NOTE: Set to zero because the units are per net generation;
# the `fix_val` is used to search for replacements (there are
# no known electricity_sum values less than 0.01, except for
# those that are 0).
fix_val = 1e-4
database_f3.loc[
database_f3['electricity_sum'] == 0, 'electricity_sum'] += fix_val
database_f3["Emission_factor"] = (
database_f3["FlowAmount"] / database_f3["electricity_sum"]
)
database_f3.loc[
database_f3['electricity_sum'] == fix_val, 'Emission_factor'] = 0

# Fix infinite values, which happen when there is 0 generation,
# common with Canadian mixes
database_f3["Emission_factor"].replace(
to_replace=float("inf"), value=0, inplace=True)
database_f3["Emission_factor"].replace(
to_replace=float("-inf"), value=0, inplace=True)
if region_agg is not None:
database_f3["GeomMean"], database_f3["GeomSD"] = zip(
*database_f3[
Expand Down

0 comments on commit 946463a

Please sign in to comment.