forked from USEPA/ElectricityLCI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generation.py
1772 lines (1561 loc) · 63.2 KB
/
generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# generation.py
#
##############################################################################
# REQUIRED MODULES
##############################################################################
from datetime import datetime
import logging
import os
import numpy as np
import pandas as pd
from scipy.stats import t
from scipy.special import erfinv
from scipy.optimize import least_squares
from scipy.stats import uniform
# Presence of 'model_specs' indicates that model configuration occurred.
from electricitylci.model_config import model_specs
from electricitylci.aggregation_selector import subregion_col
import electricitylci.ampd_plant_emissions as ampd
from electricitylci.elementaryflows import map_emissions_to_fedelemflows
from electricitylci.dqi import data_collection_lower_bound_to_dqi
from electricitylci.dqi import lookup_score_with_bound_key
from electricitylci.dqi import technological_correlation_lower_bound_to_dqi
from electricitylci.dqi import temporal_correlation_lower_bound_to_dqi
from electricitylci.eia860_facilities import eia860_balancing_authority
from electricitylci.eia923_generation import build_generation_data
from electricitylci.eia923_generation import eia923_primary_fuel
import electricitylci.emissions_other_sources as em_other
from electricitylci.globals import elci_version
from electricitylci.globals import paths
from electricitylci.globals import output_dir
import electricitylci.manual_edits as edits
from electricitylci.process_dictionary_writer import flow_table_creation
from electricitylci.process_dictionary_writer import process_doc_creation
from electricitylci.process_dictionary_writer import ref_exchange_creator
from electricitylci.process_dictionary_writer import uncertainty_table_creation
from electricitylci.process_dictionary_writer import unit
from electricitylci.utils import make_valid_version_num
from electricitylci.utils import set_dir
from electricitylci.utils import write_csv_to_output
from electricitylci.egrid_emissions_and_waste_by_facility import (
emissions_and_wastes_by_facility,
)
import facilitymatcher.globals as fmglob # provided by StEWI
##############################################################################
# MODULE DOCUMENTATION
##############################################################################
__doc__ = """A core module of electricityLCI, it combines all the data,
performs all the necessary calculations for different eGRID subregions or other
desired regional aggregation categories, and creates the dictionaries (i.e.,
the LCA inventories but in python dictionary format) and stores them in
computer memory.
CHANGELOG
- Remove module logger.
- Remove unused imports.
- Add missing documentation to methods.
- Clean up formatting towards PEP8.
- Note: the uncertainty calculations in :func:`aggregate_data` are
questionable (see doc strings of submodules for details).
- Fix the outdated pd.DataFrame.append call in :func:`turn_data_to_dict`
- Remove :func:`add_flow_representativeness_data_quality_scores` because
unused.
- Replace .values with .squeeze().values when calling a data frame with
only one column of data in :func:`olcaschema_genprocess`.
- Fix groupby for source_db in :func:`calculate_electricity_by_source` to
match the filter used to find multiple source entries.
- Add empty database check in :func:`calculate_electricity_by_source`
- Separate replace egrid function
- Fix zero division error in aggregate data
- Implement Hawkins-Young uncertainty
- Add uncertainty switch
- Drop NaNs in exchange table
Created:
2019-06-04
Last edited:
2024-08-13
"""
__all__ = [
"add_data_collection_score",
"add_technological_correlation_score",
"add_temporal_correlation_score",
"aggregate_data",
"aggregate_facility_flows",
"calculate_electricity_by_source",
"create_generation_process_df",
"eia_facility_fuel_region",
"hawkins_young",
"hawkins_young_sigma",
"hawkins_young_uncertainty",
"olcaschema_genprocess",
"replace_egrid",
"turn_data_to_dict",
]
##############################################################################
# FUNCTIONS
##############################################################################
def _calc_sigma(p_series):
"""Calculate the standard deviation for a series of facility emission
factors.
Parameters
----------
p_series : pandas.Series
A series object sent during an aggregation or apply call.
Returns
-------
float
The fitted sigma for a Hawkins-Young uncertainty method.
Assumes a 90% confidence level (see :param:`alpha`).
"""
alpha = 0.9
if model_specs.calculate_uncertainty:
(is_error, sigma) = hawkins_young_sigma(p_series.values, alpha)
else:
return None
if is_error:
return None
else:
return sigma
def _calc_geom_params(p_series):
"""Location-adjusted geometric mean and standard deviation based on the
Hawkins-Young uncertainty method.
Parameters
----------
p_series : pandas.Series
A data series for aggregated (or disaggregated) emissions,
including variables for 'uncertaintySigma' (as calculated
by :func:`_calc_sigma`), 'Emission_factor' (emission amounts
per MWh),
Returns
-------
tuple
Geometric mean : float or NaN
Geometric standard deviation : float or NaN
"""
sigma = p_series["uncertaintySigma"]
ef = p_series["Emission_factor"]
if sigma is None:
return (float('nan'), float('nan'))
d = hawkins_young_uncertainty(ef, sigma, False)
is_error = d['error']
if is_error:
return (float('nan'), float('nan'))
else:
return (d['mu_g'], d['sigma_g'])
def _wtd_mean(pdser, total_db):
"""The weighted mean method.
Parameters
----------
pdser : pandas.Series
A pandas series of numerical values.
Examples include correlation and data quality values.
total_db : pandas.DataFrame
A data frame with the same indices as the pandas series and
with a column, 'FlowAmount,' that represents the emission
amount used as the weighting factor (i.e., higher emissions
means more contribution towards the average).
Returns
-------
float or nan
The flow-amount-weighted average of values.
"""
try:
wts = total_db.loc[pdser.index, "FlowAmount"]
result = np.average(pdser, weights=wts)
except:
logging.debug(
f"Error calculating weighted mean for {pdser.name}-"
f"likely from 0 FlowAmounts"
)
try:
with np.errstate(all='raise'):
result = np.average(pdser)
except ArithmeticError or ValueError or FloatingPointError:
result = float("nan")
return result
def eia_facility_fuel_region(year):
"""Generate a data frame with EIA 860 and EIA 923 facility data.
Calculates the percent of facility generation from the primary fuel
category.
Parameters
----------
year : int
The year associated with EIA data.
Returns
-------
pandas.DataFrame
Facility-level data from EIA Forms 860 and 923.
Columns include:
- 'FacilityID' : int
- 'NAICS Code' : str (though they are integers)
- 'FuelCategory' : str
- 'PrimaryFuel' : str
- 'PercentGenerationfromDesignatedFuelCategory' : float
- 'State' : str
- 'NERC' : str
- 'Balancing Authority Code' : str
- 'Balancing Authority Name' : str
"""
logging.info(
"Generating the percent generation from primary fuel category "
"for each facility")
primary_fuel = eia923_primary_fuel(year=year)
ba_match = eia860_balancing_authority(year)
primary_fuel["Plant Id"] = primary_fuel["Plant Id"].astype(int)
ba_match["Plant Id"] = ba_match["Plant Id"].astype(int)
combined = primary_fuel.merge(ba_match, on='Plant Id')
combined['primary fuel percent gen'] = (
combined['primary fuel percent gen'] / 100
)
combined.rename(
columns={
'primary fuel percent gen': 'PercentGenerationfromDesignatedFuelCategory',
'Plant Id': 'FacilityID',
'fuel category': 'FuelCategory',
'NERC Region': 'NERC',
},
inplace=True
)
return combined
def add_technological_correlation_score(db):
"""Converts the percent generation from primary fuel to a technological data quality indicator where 1 represents >80 percent.
See 'technological_correlation_lower_bound_to_dqi' in dqi.py for bounds.
Parameters
----------
db : pandas.DataFrame
A data frame with 'PercentGenerationfromDesignatedFuelCategory' column
with floats that represent the percent of plant generation that
comes from the primary fuel category (i.e., how much of the generation
is represented by the primary fuel category).
Returns
-------
pandas.DataFrame
The same data frame received with new column,
'TechnologicalCorrelation', that represents the data quality based
on the primary fuel categorization.
"""
db['TechnologicalCorrelation'] = db[
'PercentGenerationfromDesignatedFuelCategory'].apply(
lambda x: lookup_score_with_bound_key(
x, technological_correlation_lower_bound_to_dqi)
)
return db
def add_temporal_correlation_score(db, electricity_lci_target_year):
"""Generates columns in a data frame for data age and its quality score.
Parameters
----------
db : pandas.DataFrame
A data frame with column 'Year' representing the data source year.
electricity_lci_target_year : int
The year associated with data use (see model_config attribute,
'electricity_lci_target_year').
Returns
-------
pandas.DataFrame
The same data frame received with two new columns:
- 'Age' : int, difference between target year and data source year.
- 'TemporalCorrelation' : int, DQI score based on age.
"""
# Could be more precise here with year
db['Age'] = electricity_lci_target_year - pd.to_numeric(db['Year'])
db['TemporalCorrelation'] = db['Age'].apply(
lambda x: lookup_score_with_bound_key(
x, temporal_correlation_lower_bound_to_dqi))
return db
def aggregate_facility_flows(df):
"""Aggregate flows from the same source (e.g., netl) within a facility.
The main problem this solves is that if several emissions are mapped to a
single federal elementary flow (e.g., CO2 biotic or CO2 land use change)
then those show up as separate emissions in the inventory and artificially
inflate the number of emissions for uncertainty calculations.
This method sums all duplicated emissions together (taking the average of
their data quality indicators).
Parameters
----------
df : pandas.DataFrame
A data frame with facility-level emissions that might contain duplicate
emission species within the facility.
Returns
-------
pandas.DataFrame
The same data frame sent with duplicated emissions aggregated to a
single row.
"""
emission_compartments = [
"emission/air",
"emission/water",
"emission/ground",
"emission/soil",
"air",
"water",
"soil",
"ground",
"waste",
]
groupby_cols = [
"FuelCategory",
"FacilityID",
"Electricity",
"FlowName",
"Source",
"Compartment",
"stage_code"
]
wm = lambda x: _wtd_mean(x, df)
emissions = df["Compartment"].isin(emission_compartments)
df_emissions = df[emissions]
df_nonemissions = df[~emissions]
df_dupes = df_emissions.duplicated(subset=groupby_cols, keep=False)
df_red = df_emissions.drop(df_emissions[df_dupes].index)
group_db = df_emissions.loc[df_dupes, :].groupby(
groupby_cols, as_index=False
).agg({
"FlowAmount": "sum",
"DataReliability": wm
})
group_db_merge = group_db.merge(
right=df_emissions.drop_duplicates(subset=groupby_cols),
on=groupby_cols,
how="left",
suffixes=("", "_right"),
)
try:
delete_cols = ["FlowAmount_right", "DataReliability_right"]
group_db_merge.drop(columns=delete_cols, inplace=True)
except KeyError:
logging.debug("Failed to drop columns.")
pass
df = pd.concat(
[df_nonemissions, df_red, group_db_merge],
ignore_index=True
)
return df
def _combine_sources(p_series, df, cols, source_limit=None):
"""Take the sources from a groupby.apply and return a list that
contains one column containing a list of the sources and another
that concatenates them into a string. This is all in an effort to find
another approach for summing electricity for all plants in an aggregation
that matches the same data sources.
Parameters
----------
p_series : pandas.Series
A column of source strings from inventory data frame.
df: pandas.DataFrame
Dataframe containing merged generation and emissions data; it includes
a column for data source (e.g., eGRID, NEI, and RCRAInfo).
cols : list
Unused column list, except for debugging statement.
source_limit : int, optional
The maximum number of sources allowed to be found.
Defaults to none.
Returns
----------
list
A list of length two.
1. The first item is a list of all sources or nan.
2. The second item is a string of concatenated sources or nan.
"""
logging.debug(
f"Combining sources for {str(df.loc[p_series.index[0],cols].values)}"
)
source_list = list(np.unique(p_series))
if source_limit is not None:
if len(source_list) > source_limit:
result = [float("nan"), float("nan")]
return result
else:
source_list.sort()
source_list_string = "_".join(source_list)
result = [source_list, source_list_string]
return result
else:
source_list.sort()
# HOTFIX: rm redundant calls [2023-11-08; TWD]
source_list_string = "_".join(source_list)
result = [source_list, source_list_string]
return result
def add_data_collection_score(db, elec_df, subregion="BA"):
"""Add the data collection score.
This is a function of how much of the total electricity generated in a
subregion is captured by the denominator used in the final emission factor.
Parameters
----------
db : datafrane
Dataframe containing facility-level emissions as generated by
create_generation_process_df.
elec_df : dataframe
Dataframe containing the totals for various subregion/source
combinations. These are used as the denominators in the emissions
factors
subregion : str, optional
The level of subregion that the data will be aggregated to. Choices
are 'all', 'NERC', 'BA', 'US', by default 'BA'
"""
logging.info("Adding data collection score")
region_agg = subregion_col(subregion)
fuel_agg = ["FuelCategory"]
if region_agg:
groupby_cols = region_agg + fuel_agg + ["Year"]
else:
groupby_cols = fuel_agg + ["Year"]
temp_df = db.merge(
right=elec_df,
left_on=groupby_cols + ["source_string"],
right_on=groupby_cols + ["source_string"],
how="left",
)
reduced_db = db.drop_duplicates(subset=groupby_cols + ["eGRID_ID"])
region_elec = reduced_db.groupby(groupby_cols, as_index=False)[
"Electricity"
].sum()
region_elec.rename(
columns={"Electricity": "region_fuel_electricity"}, inplace=True
)
temp_df = temp_df.merge(
right=region_elec,
left_on=groupby_cols,
right_on=groupby_cols,
how="left",
)
db["Percent_of_Gen_in_EF_Denominator"] = (
temp_df["electricity_sum"] / temp_df["region_fuel_electricity"]
)
db["DataCollection"] = db["Percent_of_Gen_in_EF_Denominator"].apply(
lambda x: lookup_score_with_bound_key(
x, data_collection_lower_bound_to_dqi
)
)
db = db.drop(columns="Percent_of_Gen_in_EF_Denominator")
return db
def calculate_electricity_by_source(db, subregion="BA"):
"""Calculate the electricity totals by region and source.
This method uses the same approach as the original generation.py with
attempts made to speed it up. Each flow will have a source associated
with it (eGRID, NEI, TRI, RCRAInfo). To develop an emission factor,
the FlowAmount will need to be divided by electricity generation.
This routine sums all electricity generation for all source/subregion
combinations. So if a subregion aggregates FlowAmounts source from NEI and
TRI then the denominator will be all production from plants that reported
into NEI or TRI for that subregion.
Parameters
----------
db : pandas.DataFrame
Dataframe containing facility-level emissions as generated by
create_generation_process_df.
subregion : str, optional
The level of subregion that the data will be aggregated to. Choices
are 'all', 'NERC', 'BA', 'US', by default 'BA'
Returns
-------
tuple
pandas.DataFrame :
Inventory dataframe with source list and source string fields.
pandas.DataFrame :
The calculation of average and total electricity for each source
along with the facility count for each source.
"""
all_sources = '_'.join(sorted(list(db["Source"].unique())))
power_plant_criteria = db["stage_code"]=="Power plant"
db_powerplant = db.loc[power_plant_criteria, :].copy()
db_nonpower = db.loc[~power_plant_criteria, :].copy()
region_agg = subregion_col(subregion)
fuel_agg = ["FuelCategory"]
if region_agg:
groupby_cols = (
region_agg
+ fuel_agg
+ ["Year", "stage_code", "FlowName", "Compartment"]
)
elec_groupby_cols = region_agg + fuel_agg + ["Year"]
else:
groupby_cols = fuel_agg + [
"Year",
"stage_code",
"FlowName",
"Compartment",
]
elec_groupby_cols = fuel_agg + ["Year"]
# HOTFIX: add check for empty powerplant data frame [2023-12-19; TWD]
if len(db_powerplant) == 0:
db_cols = list(db_powerplant.columns) + ['source_list', 'source_string']
db_powerplant = pd.DataFrame(columns=db_cols)
else:
# This is a pretty expensive process when we have to start looking
# at each flow generated in each compartment for each balancing
# authority area. To hopefully speed this up, we'll group by FlowName
# and Compartment and look and try to eliminate flows where all
# sources are single entities.
combine_source_by_flow = lambda x: _combine_sources(
x, db, ["FlowName", "Compartment"], 1
)
# Find all single-source flows (all multiple sources are nans)
source_df = pd.DataFrame(
db_powerplant.groupby(["FlowName", "Compartment"])[
["Source"]].apply(combine_source_by_flow),
columns=["source_list"],
)
source_df[["source_list", "source_string"]] = pd.DataFrame(
source_df["source_list"].values.tolist(),
index=source_df.index
)
source_df.reset_index(inplace=True)
old_index = db_powerplant.index
db_powerplant = db_powerplant.merge(
right=source_df,
left_on=["FlowName", "Compartment"],
right_on=["FlowName", "Compartment"],
how="left",
)
db_powerplant.index = old_index
# Filter out single flows; leaving only multi-flows
db_multiple_sources = db_powerplant.loc[
db_powerplant["source_string"].isna(), :].copy()
if len(db_multiple_sources) > 0:
combine_source_lambda = lambda x: _combine_sources(
x, db_multiple_sources, groupby_cols
)
# HOTFIX: it doesn't make sense to groupby a different group;
# it gives different results from the first-pass filter;
# changed to match criteria above. [2023-12-19; TWD]
source_df = pd.DataFrame(
db_multiple_sources.groupby(["FlowName", "Compartment"])[
["Source"]].apply(combine_source_lambda),
columns=["source_list"],
)
source_df[["source_list", "source_string"]] = pd.DataFrame(
source_df["source_list"].values.tolist(),
index=source_df.index
)
source_df.reset_index(inplace=True)
db_multiple_sources.drop(
columns=["source_list", "source_string"], inplace=True
)
old_index = db_multiple_sources.index
db_multiple_sources = db_multiple_sources.merge(
right=source_df,
left_on=["FlowName", "Compartment"],
right_on=["FlowName", "Compartment"],
how="left",
)
db_multiple_sources.index = old_index
db_powerplant.loc[
db_powerplant["source_string"].isna(),
["source_string", "source_list"]
] = db_multiple_sources[["source_string", "source_list"]]
unique_source_lists = list(db_powerplant["source_string"].unique())
unique_source_lists = [x for x in unique_source_lists if str(x) != "nan"]
unique_source_lists += [all_sources]
# One set of emissions passed into this routine may be life cycle emissions
# used as proxies for Canadian generation. In those cases the electricity
# generation will be equal to the Electricity already in the dataframe.
elec_sum_lists = list()
for src in unique_source_lists:
logging.info(f"Calculating electricity for {src}")
db["temp_src"] = src
src_filter = [
a in b
for a, b in zip(
db["Source"].values.tolist(), db["temp_src"].values.tolist()
)
]
sub_db = db.loc[src_filter, :].copy()
sub_db.drop_duplicates(subset=fuel_agg + ["eGRID_ID"], inplace=True)
# HOTFIX: fix pandas futurewarning syntax [2024-03-08; TWD]
sub_db_group = sub_db.groupby(elec_groupby_cols, as_index=False).agg(
{"Electricity": ["sum", "mean"], "eGRID_ID": "count"}
)
sub_db_group.columns = elec_groupby_cols + [
"electricity_sum",
"electricity_mean",
"facility_count",
]
sub_db_group["source_string"] = src
elec_sum_lists.append(sub_db_group)
db_nonpower["source_string"] = all_sources
db_nonpower["source_list"] = [all_sources]*len(db_nonpower)
elec_sums = pd.concat(elec_sum_lists, ignore_index=True)
elec_sums.sort_values(by=elec_groupby_cols, inplace=True)
db = pd.concat([db_powerplant, db_nonpower])
return db, elec_sums
def get_generation_years():
"""Create list of generation years based on model configuration.
Reads the model specs for inventories of interest, generation year,
and (if renewables are included) the hydro power plant data year.
Returns
-------
list
A list of years (int)
"""
generation_years = [model_specs.eia_gen_year]
# Check to see if hydro power plant data are used (always 2016)
if model_specs.include_renewable_generation is True:
generation_years += [2016]
# Add years of inventories of interest; remove duplicates, and
# sort chronologically:
generation_years = sorted(list(set(
list(model_specs.inventories_of_interest.values())
+ generation_years
)))
return generation_years
def get_facilities_w_fuel_region(years=None):
"""Capture all facility fuels and regions for a given set of years.
Parameters
----------
years : list, optional
List of years, by default None
Returns
-------
pandas.DataFrame
A data frame with columns
- 'FacilityID' (int): plant identifier
- 'FuelCategory' (str): primary fuel category
- 'PrimaryFuel' (str): primary fuel code
- 'PercentGenerationfromDesignatedFuelCategory' (float)
- 'State' (str): two-character state code
- 'NERC' (str): NERC region code
- 'Balancing Authority Code' (str)
- 'Balancing Authority Name' (str)
"""
if years is None:
years = get_generation_years()
if isinstance(years, (int, float, str)):
years = [years,]
for i in range(len(years)):
year = years[i]
if i == 0:
a = eia_facility_fuel_region(year)
else:
b = eia_facility_fuel_region(year)
# This appends a suffix on the old data, and gap fills
# new data with old data that are not found in the new data.
# Source: https://stackoverflow.com/a/69504041
a = a.merge(
b,
how='outer',
on='FacilityID',
suffixes=('_df1', '')
)
for col_name in b.columns:
new_name = col_name + "_df1"
if new_name in a.columns:
# Fill in new column's NaNs with old data:
a[col_name] = a[col_name].fillna(a[new_name])
a.drop(columns=new_name, inplace=True)
return a
def create_generation_process_df():
"""Read emissions and generation data from different sources to provide
facility-level emissions. Most important inputs to this process come
from the model configuration file.
Maps balancing authorities to FERC and EIA regions.
Returns
----------
pandas.DataFrame
Data frame includes all facility-level emissions.
"""
from electricitylci.combinator import BA_CODES
COMPARTMENT_DICT = {
"emission/air": "air",
"emission/water": "water",
"emission/ground": "ground",
"input": "input",
"output": "output",
"waste": "waste",
"air": "air",
"water": "water",
"ground": "ground",
}
if model_specs.replace_egrid:
# Create data frame with EIA's info on:
# - 'FacilityID' (int),
# - 'Electricity' (float), and
# - 'Year' (int)
# NOTE: this may return multi-year facilities
generation_data = build_generation_data().drop_duplicates()
# Pull list of unique facilities from all generation years of interest
eia_facilities_to_include = generation_data["FacilityID"].unique()
# Create the file name for reading/writing facility matcher data.
inventories_of_interest_list = sorted([
f"{x}_{model_specs.inventories_of_interest[x]}"
for x in model_specs.inventories_of_interest.keys()
])
inventories_of_interest_str = "_".join(inventories_of_interest_list)
# NOTE: data pulled from Facility Register Service (FRS) program
# provided by USEPA's FacilityMatcher, now a part of StEWI.
# https://github.com/USEPA/standardizedinventories
try:
eia860_FRS = pd.read_csv(
f"{paths.local_path}/FRS_bridges/"
f"{inventories_of_interest_str}.csv")
logging.info(
"Got EIA860 to FRS ID matches from existing file")
eia860_FRS["REGISTRY_ID"] = eia860_FRS["REGISTRY_ID"].astype(str)
except FileNotFoundError:
logging.info(
"Will need to load EIA860 to FRS matches using stewi "
"facility matcher - it may take a while to download "
"and read the required data")
file_ = fmglob.FRS_config['FRS_bridge_file']
col_dict = {
'REGISTRY_ID': "str",
'PGM_SYS_ACRNM': "str",
'PGM_SYS_ID': "str"
}
FRS_bridge = fmglob.read_FRS_file(file_, col_dict)
eia860_FRS = fmglob.filter_by_program_list(
df=FRS_bridge, program_list=["EIA-860"]
)
# Define file paths
frs_dir = os.path.join(f"{paths.local_path}", "FRS_bridges")
frs_csv = f"{inventories_of_interest_str}.csv"
frs_path = os.path.join(frs_dir, frs_csv)
# Ensure output folder exists
set_dir(frs_dir)
# Save a local copy
write_csv_to_output(frs_path, eia860_FRS)
# emissions_and_wastes_by_facility is a StEWICombo inventory based on
# inventories of interest (e.g., eGRID, RCRAInfo, NEI) and their
# respective years as defined in the model config.
# Columns in the emissions_and_wastes_by_facility include
# FacilityID and FRS_ID (the latter links to REGISTRY_ID in FRS).
# This effectively adds 'PGM_SYS_ID', which are the EIA facility
# numbers and maps them to eGRID facility numbers.
# NOTE: there are unmatched facilities that are found in FRS_bridge,
# but not in EIA (e.g., EGRID, RCRA).
ewf_df = pd.merge(
left=emissions_and_wastes_by_facility,
right=eia860_FRS,
left_on="FRS_ID",
right_on="REGISTRY_ID",
how="left",
)
# Effectively removes all non-EIA facilities from StEWICombo inventory.
# drops 909 rows in 2022 inventory
ewf_df.dropna(subset=["PGM_SYS_ID"], inplace=True)
# Drop unused columns; note legacy column names are still here.
d_cols = [
"NEI_ID",
"FRS_ID",
"TRI_ID",
"RCRAInfo_ID",
"PGM_SYS_ACRNM",
"REGISTRY_ID"
]
d_cols = [x for x in d_cols if x in ewf_df.columns]
if len(d_cols) > 0:
ewf_df.drop(columns=d_cols, inplace=True)
# Convert facility ID to integer for comparisons.
ewf_df["FacilityID"] = ewf_df["PGM_SYS_ID"].astype(int)
# Filter stewi inventory to just (EIA) facilities of interest.
# HOTFIX: SettingWithCopyWarning [2024-03-12; TWD]
eaw_for_select_eia_facilities = ewf_df[
ewf_df["FacilityID"].isin(eia_facilities_to_include)].copy()
# HOTFIX: "eGRID_ID" column already appears
if "eGRID_ID" in eaw_for_select_eia_facilities.columns:
eaw_for_select_eia_facilities.drop(columns="eGRID_ID", inplace=True)
eaw_for_select_eia_facilities.rename(
columns={"FacilityID": "eGRID_ID"}, inplace=True)
# Read in EPA's CEMS state-level data
# NOTE: reads in all facility data, including those 99999 facilities
# that were filtered out w/ NAICS code filtering. These facilities
# are re-filtered later on during a merge with generation data.
cems_df = ampd.generate_plant_emissions(model_specs.eia_gen_year)
# Correct StEWI emissions
emissions_df = em_other.integrate_replace_emissions(
cems_df, eaw_for_select_eia_facilities
)
# Read EIA 860/923 facility info (e.g., PrimaryFuel and percent of
# generation from designated fuel category).
# HOTFIX: gather "the best" facility fuel and location data across
# all inventory years [240809; TWD].
facilities_w_fuel_region = get_facilities_w_fuel_region()
facilities_w_fuel_region.rename(
columns={'FacilityID': 'eGRID_ID'},
inplace=True
)
else:
# Load list; only works when not replacing eGRID!
from electricitylci.generation_mix import egrid_facilities_w_fuel_region
from electricitylci.egrid_filter import (
electricity_for_selected_egrid_facilities,
emissions_and_waste_for_selected_egrid_facilities,
)
# HOTFIX: avoid overwriting the global variable by using a copy
# NOTE: egrid_facilities_with_fuel_region is the same as
# egrid_facilities
facilities_w_fuel_region = egrid_facilities_w_fuel_region.copy()
facilities_w_fuel_region["FacilityID"] = \
facilities_w_fuel_region["FacilityID"].astype(int)
facilities_w_fuel_region.rename(
columns={'FacilityID': 'eGRID_ID'},
inplace=True)
generation_data = electricity_for_selected_egrid_facilities.copy()
generation_data["Year"] = model_specs.egrid_year
generation_data["FacilityID"] = \
generation_data["FacilityID"].astype(int)
emissions_df = emissions_and_waste_for_selected_egrid_facilities.copy()
emissions_df["eGRID_ID"] = emissions_df["eGRID_ID"].astype(int)
# HOTFIX: ValueError w/ Year as string and integer [2023-12-22; TWD]
emissions_df['Year'] = emissions_df['Year'].astype(int)
generation_data['Year'] = generation_data['Year'].astype(int)
generation_data.rename(columns={'FacilityID': 'eGRID_ID'}, inplace=True)
# Match electricity generation data (generation_data) to their facility
# emissions inventory (emissions_df) by year.
# HOTFIX: Change how to 'inner' to ensure that plants that have been
# filtered out are not included (e.g., by NAICS) [3/4/2024; M. Jamieson]
final_database = pd.merge(
left=emissions_df,
right=generation_data,
on=["eGRID_ID", "Year"],
how="inner",
)
# Add facility-level info to the emissions and generation data.
# NOTE some failed-to-match facilities with location exist.
# This is likely due to 'facilities_w_fuel_region' being associated with
# the EIA generation year, whilst the data are from several vintages.
final_database = pd.merge(
left=final_database,
right=facilities_w_fuel_region,
on="eGRID_ID",
how="left",
suffixes=["", "_right"],
)
if model_specs.replace_egrid:
# Get EIA primary fuel categories (and their percent generation);
# The data are the same as from EIA's `eia_facility_fuel_region`,
# but with additional facilities.
primary_fuel_df = eia923_primary_fuel(year=model_specs.eia_gen_year)
primary_fuel_df.rename(
columns={'Plant Id': "eGRID_ID"},
inplace=True
)
primary_fuel_df["eGRID_ID"] = primary_fuel_df["eGRID_ID"].astype(int)
# Produce a data frame of plant ID to fuel category for mapping
# NOTE: drop duplicates should not be necessary;
# passed checks 2016, 2020, 2022 [240809; TWD]
key_df = (
primary_fuel_df[["eGRID_ID", "FuelCategory"]]
.dropna().drop_duplicates().set_index("eGRID_ID")
)
# Fills some, but not all.
final_database["FuelCategory"] = final_database["eGRID_ID"].map(
key_df["FuelCategory"])
else:
# Attempt to use facility data to match NaNs.
key_df = (
final_database[["eGRID_ID", "FuelCategory"]]
.dropna().drop_duplicates().set_index("eGRID_ID")
)
final_database.loc[
final_database["FuelCategory"].isnull(), "FuelCategory"
] = final_database.loc[
final_database["FuelCategory"].isnull(), "eGRID_ID"
].map(
key_df["FuelCategory"]
)
final_database["Final_fuel_agg"] = final_database["FuelCategory"]
if 'Year_x' in final_database.columns:
year_filter = final_database["Year_x"] == final_database["Year_y"]
final_database = final_database.loc[year_filter, :]
final_database.drop(columns="Year_y", inplace=True)
final_database.rename(columns={"Year_x": "Year"}, inplace=True)
# Use the Federal Elementary Flow List (FEDEFL) to map flow UUIDs
# NOTE: 10,000 unmatched flows; mostly wastes and product flows
final_database = map_emissions_to_fedelemflows(final_database)
# Sanity check that no duplicated columns exist in the data frame.
final_database = final_database.loc[
:, ~final_database.columns.duplicated()
]
# Sanity check that no duplicate emission rows are in the data frame.
dup_cols_check = [
"eGRID_ID",
"FuelCategory",
"FlowName",
"FlowAmount",
"Compartment",
]
final_database = final_database.drop_duplicates(subset=dup_cols_check)
drop_columns = ['PrimaryFuel_right', 'FuelCategory', 'FuelCategory_right']
drop_columns = [c for c in drop_columns if c in final_database.columns]
final_database.drop(columns=drop_columns, inplace=True)
final_database.rename(
columns={"Final_fuel_agg": "FuelCategory"},
inplace=True,
)
# Add DQI