diff --git a/egret/model_library/transmission/tests/test_tx_utils.py b/egret/model_library/transmission/tests/test_tx_utils.py index 13db9c0d..eec6f8f0 100644 --- a/egret/model_library/transmission/tests/test_tx_utils.py +++ b/egret/model_library/transmission/tests/test_tx_utils.py @@ -457,3 +457,10 @@ def test_poly_cubic(self): p_max=85, gen_name='foo', t=None) + +class TestTxUtils(unittest.TestCase): + + def test_element_types(self): + etypes = list(tx_utils.element_types()) + self.assertNotEqual(len(etypes), 0) + self.assertEqual(len(etypes), len(set(etypes))) diff --git a/egret/model_library/transmission/tx_utils.py b/egret/model_library/transmission/tx_utils.py index 5feb2a7d..9571ed20 100644 --- a/egret/model_library/transmission/tx_utils.py +++ b/egret/model_library/transmission/tx_utils.py @@ -370,6 +370,8 @@ def load_shed_limit(load, gens, gen_mins): 'pl', 'ql', ], + ('element_type', 'security_constraint', None) : [ + ], ('element_type', 'security_constraint', 'pg') : [ 'lower_bound', 'upper_bound', 'violation_penalty', @@ -381,6 +383,8 @@ def load_shed_limit(load, gens, gen_mins): 'pf', 'pf_violation', ], + ('element_type', 'fuel_supply', None) : [ + ], ('system_attributes', None, None ) : [ 'load_mismatch_cost', 'q_load_mismatch_cost', @@ -389,6 +393,11 @@ def load_shed_limit(load, gens, gen_mins): ancillary_service_stack, } +def element_types(): + ''' Get an iterable that yields each valid egret element type as a string + ''' + return (key[1] for key in scaled_attributes.keys() + if key[0] == 'element_type' and key[2] is None) def scale_ModelData_to_pu(model_data, inplace=False): return _convert_modeldata_pu(model_data, _divide_by_baseMVA, inplace) diff --git a/egret/parsers/rts_gmlc/parsed_cache.py b/egret/parsers/rts_gmlc/parsed_cache.py index 44dd9548..0c62abc1 100644 --- a/egret/parsers/rts_gmlc/parsed_cache.py +++ b/egret/parsers/rts_gmlc/parsed_cache.py @@ -94,6 +94,70 @@ def populate_skeleton_with_data(self, skeleton_dict:dict, simulation_type:str, self._process_timeseries_data(skeleton_dict, simulation_type, begin_time, end_time) self._insert_system_data(skeleton_dict, simulation_type, begin_time, end_time) + def get_timeseries_locations(self, simulation_type:str, md:dict) -> Iterable[Tuple[dict, str]]: + ''' Get all locations in the provided model with a defined time series. + + Returns + ------- + Each location is returned as a dict and the name of a key within the dict. + + Remarks + ------- + This method returns time series locations as specified in the RTS-GMLC input that created + this cache. It only returns locations the rts_gmlc parser knows how to map from the input + to a location in the Egret model; other time series, if any, are skipped. + ''' + + df = self.timeseries_df + + system = md.data['system'] + loads = md.data['elements']['load'] + generators = md.data['elements']['generator'] + areas = md.data['elements']['area'] + + sim_col = df.columns.get_loc('Simulation') + cat_col = df.columns.get_loc('Category') + obj_col = df.columns.get_loc('Object') + param_col = df.columns.get_loc('Parameter') + + # Go through each timeseries value for this simulation type + for i in range(self._first_indices[simulation_type], len(df)): + if df.iat[i, sim_col] != simulation_type: + break + + category = df.iat[i, cat_col] + + if category == 'Generator': + gen_name = df.iat[i, obj_col] + param = df.iat[i, param_col] + + if param == 'PMin MW': + yield (generators[gen_name], 'p_min') + elif param == 'PMax MW': + yield (generators[gen_name], 'p_max') + + elif category == 'Area': + area_name = df.iat[i, obj_col] + param = df.iat[i, param_col] + assert(param == "MW Load") + for l_d in loads.values(): + # Skip loads from other areas + if l_d['area'] != area_name: + continue + yield (l_d, 'p_load') + yield (l_d, 'q_load') + + elif category == 'Reserve': + res_name = df.iat[i, obj_col] + if res_name in reserve_name_map: + yield (system, reserve_name_map[res_name]) + else: + # reserve name must be _R, + # split into type and area + res_name, area_name = res_name.split("_R", 1) + yield (areas[area_name], reserve_name_map[res_name]) + + def _process_timeseries_data(self, md:dict, simulation_type:str, begin_time:datetime, end_time:datetime) -> None: df = self.timeseries_df