diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0fdee7574..d856851f2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -44,6 +44,7 @@ - Fallback components are used in generated formulas. If primary components is unavailable, formula will generate metric from fallback components. Fallback formulas are implemented for: - PVPowerFormula + - ProducerPowerFormula ## Enhancements diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_producer_power_formula.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_producer_power_formula.py index a0433192e..b298a47e4 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_producer_power_formula.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_producer_power_formula.py @@ -4,13 +4,20 @@ """Formula generator from component graph for Producer Power.""" import logging +from typing import Callable -from frequenz.client.microgrid import ComponentCategory, ComponentMetricId +from frequenz.client.microgrid import Component, ComponentCategory, ComponentMetricId from ....microgrid import connection_manager from ..._quantities import Power from .._formula_engine import FormulaEngine -from ._formula_generator import NON_EXISTING_COMPONENT_ID, FormulaGenerator +from ._fallback_formula_metric_fetcher import FallbackFormulaMetricFetcher +from ._formula_generator import ( + NON_EXISTING_COMPONENT_ID, + ComponentNotFound, + FormulaGenerator, + FormulaGeneratorConfig, +) _logger = logging.getLogger(__name__) @@ -43,35 +50,111 @@ def generate( # noqa: DOC502 ) component_graph = connection_manager.get().component_graph - # if in the future we support additional producers, we need to add them to the lambda - producer_components = component_graph.dfs( - self._get_grid_component(), - set(), - lambda component: component_graph.is_pv_chain(component) - or component_graph.is_chp_chain(component), - ) - - if not producer_components: - _logger.warning( - "Unable to find any producer components in the component graph. " - "Subscribing to the resampling actor with a non-existing " - "component id, so that `0` values are sent from the formula." + if self._config.component_ids is None: + # if in the future we support additional producers, we need to add them to the lambda + producer_components = component_graph.dfs( + self._get_grid_component(), + set(), + lambda component: component_graph.is_pv_chain(component) + or component_graph.is_chp_chain(component), ) - # If there are no producer components, we have to send 0 values at the same - # frequency as the other streams. So we subscribe with a non-existing - # component id, just to get a `None` message at the resampling interval. - builder.push_component_metric( - NON_EXISTING_COMPONENT_ID, nones_are_zeros=True + + if not producer_components: + _logger.warning( + "Unable to find any producer components in the component graph. " + "Subscribing to the resampling actor with a non-existing " + "component id, so that `0` values are sent from the formula." + ) + # If there are no producer components, we have to send 0 values at the same + # frequency as the other streams. So we subscribe with a non-existing + # component id, just to get a `None` message at the resampling interval. + builder.push_component_metric( + NON_EXISTING_COMPONENT_ID, nones_are_zeros=True + ) + return builder.build() + + else: + producer_components = component_graph.components( + component_ids=set(self._config.component_ids) ) - return builder.build() + if len(producer_components) != len(self._config.component_ids): + raise ComponentNotFound( + "Unable to find all requested producer components." + f"Requested {self._config.component_ids}, " + f" found {producer_components}." + ) + + is_not_meter: Callable[[Component], bool] = ( + lambda component: component.category != ComponentCategory.METER + ) + + if self._config.allow_fallback: + fallbacks = self._get_fallback_formulas(producer_components) + + for idx, (primary_component, fallback_formula) in enumerate( + fallbacks.items() + ): + if idx > 0: + builder.push_oper("+") + + # should only be the case if the component is not a meter + builder.push_component_metric( + primary_component.component_id, + nones_are_zeros=is_not_meter(primary_component), + fallback=fallback_formula, + ) + else: + for idx, component in enumerate(producer_components): + if idx > 0: + builder.push_oper("+") + + builder.push_component_metric( + component.component_id, + nones_are_zeros=is_not_meter(component), + ) + + return builder.build() - for idx, component in enumerate(producer_components): - if idx > 0: - builder.push_oper("+") + def _get_fallback_formulas( + self, components: set[Component] + ) -> dict[Component, FallbackFormulaMetricFetcher[Power] | None]: + """Find primary and fallback components and create fallback formulas. - builder.push_component_metric( - component.component_id, - nones_are_zeros=component.category != ComponentCategory.METER, + The primary component is the one that will be used to calculate the producer power. + But if it is not available, the fallback formula will be used instead. + Fallback formulas calculates the producer power using the fallback components. + Fallback formulas are wrapped in `FallbackFormulaMetricFetcher`. + + Args: + components: The producer components. + + Returns: + A dictionary mapping primary components to their FallbackFormulaMetricFetcher. + """ + fallbacks = self._get_metric_fallback_components(components) + + fallback_formulas: dict[ + Component, FallbackFormulaMetricFetcher[Power] | None + ] = {} + + for primary_component, fallback_components in fallbacks.items(): + if len(fallback_components) == 0: + fallback_formulas[primary_component] = None + continue + + fallback_ids = [c.component_id for c in fallback_components] + generator = ProducerPowerFormula( + f"{self._namespace}_fallback_{fallback_ids}", + self._channel_registry, + self._resampler_subscription_sender, + FormulaGeneratorConfig( + component_ids=set(fallback_ids), + allow_fallback=False, + ), ) - return builder.build() + fallback_formulas[primary_component] = FallbackFormulaMetricFetcher( + generator + ) + + return fallback_formulas diff --git a/src/frequenz/sdk/timeseries/producer.py b/src/frequenz/sdk/timeseries/producer.py index 22ddf4b61..671a18479 100644 --- a/src/frequenz/sdk/timeseries/producer.py +++ b/src/frequenz/sdk/timeseries/producer.py @@ -12,7 +12,10 @@ from ._quantities import Power from .formula_engine import FormulaEngine from .formula_engine._formula_engine_pool import FormulaEnginePool -from .formula_engine._formula_generators import ProducerPowerFormula +from .formula_engine._formula_generators import ( + FormulaGeneratorConfig, + ProducerPowerFormula, +) class Producer: @@ -91,6 +94,9 @@ def power(self) -> FormulaEngine[Power]: engine = self._formula_pool.from_power_formula_generator( "producer_power", ProducerPowerFormula, + FormulaGeneratorConfig( + allow_fallback=True, + ), ) assert isinstance(engine, FormulaEngine) return engine diff --git a/tests/timeseries/test_producer.py b/tests/timeseries/test_producer.py index c975352af..c881ddabb 100644 --- a/tests/timeseries/test_producer.py +++ b/tests/timeseries/test_producer.py @@ -60,6 +60,7 @@ async def test_producer_power_no_pv_no_consumer_meter( producer_power_receiver = producer.power.new_receiver() await mockgrid.mock_resampler.send_chp_power([2.0]) + assert (await producer_power_receiver.receive()).value == Power.from_watts( 2.0 ) @@ -94,3 +95,83 @@ async def test_no_producer_power(self, mocker: MockerFixture) -> None: assert (await producer_power_receiver.receive()).value == Power.from_watts( 0.0 ) + + async def test_producer_fallback_formula(self, mocker: MockerFixture) -> None: + """Test the producer power formula with fallback formulas.""" + mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) + mockgrid.add_solar_inverters(2) + # CHP has no meter, so no fallback component + mockgrid.add_chps(1, no_meters=True) + + async with mockgrid, AsyncExitStack() as stack: + producer = microgrid.producer() + stack.push_async_callback(producer.stop) + producer_power_receiver = producer.power.new_receiver() + + # Note: ProducerPowerFormula has a "nones-are-zero" rule, that says: + # * if the meter value is None, it should be treated as None. + # * for other components None is treated as 0. + + # fmt: off + expected_input_output: list[ + tuple[list[float | None], list[float | None], list[float | None], Power | None] + ] = [ + # ([pv_meter_power], [pv_inverter_power], [chp_power], expected_power) + # Add power from meters and chp + ([-1.0, -2.0], [None, -200.0], [300], Power.from_watts(297.0)), + ([-1.0, -10], [-100.0, -200.0], [400], Power.from_watts(389.0)), + # Case 2: The first meter is unavailable (None). + # Subscribe to the fallback inverter, but return None as the result, + # according to the "nones-are-zero" rule + ([None, -2.0], [-100, -200.0], [400], None), + # Case 3: First meter is unavailable (None). Fallback inverter provides + # a value. + # Add second meter, first inverter and chp power + ([None, -2.0], [-100, -200.0], [400], Power.from_watts(298.0)), + ([None, -2.0], [-50, -200.0], [300], Power.from_watts(248.0)), + # Case 4: Both first meter and its fallback inverter are unavailable + # (None). Return 0 from failing component according to the + # "nones-are-zero" rule. + ([None, -2.0], [None, -200.0], [300], Power.from_watts(298.0)), + ([None, -10.0], [-20.0, -200.0], [300], Power.from_watts(270.0)), + # Case 5: CHP is unavailable. Return 0 from failing component + # according to the "nones-are-zero" rule. + ([None, -10.0], [-20.0, -200.0], [None], Power.from_watts(-30.0)), + # Case 6: Both meters are unavailable (None). Subscribe for fallback inverter + ([None, None], [-20.0, -200.0], [None], None), + ([None, None], [-20.0, -200.0], [None], Power.from_watts(-220.0)), + ([None, None], [None, -200.0], [None], Power.from_watts(-200.0)), + # Case 7: All components are unavailable (None). Return 0 according to the + # "nones-are-zero" rule. + ([None, None], [None, None], [None], Power.from_watts(0)), + ([None, None], [None, None], [None], Power.from_watts(0)), + ([None, None], [None, None], [300.0], Power.from_watts(300.0)), + ([-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)), + ([-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)), + # Case 8: Meter is unavailable but we already subscribed for inverter + # So don't return None in this case. Just proper formula result. + ([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160.0)), + + ] + # fmt: on + + for idx, ( + meter_power, + pv_inverter_power, + chp_power, + expected_power, + ) in enumerate(expected_input_output): + await mockgrid.mock_resampler.send_chp_power(chp_power) + await mockgrid.mock_resampler.send_meter_power(meter_power) + await mockgrid.mock_resampler.send_pv_inverter_power(pv_inverter_power) + mockgrid.mock_resampler.next_ts() + + result = await producer_power_receiver.receive() + assert result.value == expected_power, ( + f"Test case {idx} failed:" + + f" meter_power: {meter_power}" + + f" pv_inverter_power {pv_inverter_power}" + + f" chp_power {chp_power}" + + f" expected_power: {expected_power}" + + f" actual_power: {result.value}" + )