diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 4607bccc0d..a085d1b0fd 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -11,35 +11,39 @@ """ import re -from datetime import date, datetime, time, timedelta, timezone -from io import StringIO +from datetime import datetime, timedelta, timezone from logging import Logger, getLogger +from typing import Any -import arrow -import pandas as pd -from requests import Session +from requests import Response, Session -from electricitymap.contrib.config.constants import PRODUCTION_MODES +from electricitymap.contrib.lib.models.event_lists import ( + ExchangeList, + ProductionBreakdownList, +) +from electricitymap.contrib.lib.models.events import ProductionMix, StorageMix +from electricitymap.contrib.lib.types import ZoneKey from parsers.lib.config import refetch_frequency from parsers.lib.exceptions import ParserException -from parsers.lib.utils import get_token -from parsers.lib.validation import validate - -ELEXON_ENDPOINT = "https://api.bmreports.com/BMRS/{}/v1" +ELEXON_API_ENDPOINT = "https://data.elexon.co.uk/bmrs/api/v1" +ELEXON_URLS = { + "production": "/".join((ELEXON_API_ENDPOINT, "datasets/AGPT/stream")), + "production_fuelhh": "/".join((ELEXON_API_ENDPOINT, "datasets/FUELHH/stream")), + "exchange": "/".join((ELEXON_API_ENDPOINT, "generation/outturn/interconnectors")), +} +ELEXON_START_DATE = datetime( + 2019, 1, 1, tzinfo=timezone.utc +) # ELEXON API only has data from 2019-01-01 +ELEXON_SOURCE = "elexon.co.uk" ESO_NATIONAL_GRID_ENDPOINT = ( "https://api.nationalgrideso.com/api/3/action/datastore_search_sql" ) +ESO_SOURCE = "nationalgrideso.com" # A specific report to query most recent data (within 1 month time span + forecast ahead) ESO_DEMAND_DATA_UPDATE_ID = "177f6fa4-ae49-4182-81ea-0c6b35f26ca6" -REPORT_META = { - "B1620": {"expected_fields": 13, "skiprows": 5}, - "FUELINST": {"expected_fields": 23, "skiprows": 1}, - "INTERFUELHH": {"expected_fields": 12, "skiprows": 0}, -} - # 'hydro' key is for hydro production # 'hydro storage' key is for hydro storage RESOURCE_TYPE_TO_FUEL = { @@ -84,17 +88,130 @@ "PUMP_STORAGE_PUMPING": "hydro storage", } -EXCHANGES = { - "FR->GB": [3, 8, 9], # IFA, Eleclink, IFA2 - "GB->GB-NIR": [4], - "GB->NL": [5], - "GB->IE": [6], - "BE->GB": [7], - "GB->NO-NO2": [10], # North Sea Link - "DK-DK1->GB": [11], # Viking Link +ZONEKEY_TO_INTERCONNECTOR = { + "BE->GB": ["Belgium (Nemolink)"], + "DK-DK1->GB": ["Denmark (Viking link)"], + "FR->GB": ["Eleclink (INTELEC)", "France(IFA)", "IFA2 (INTIFA2)"], + "GB->GB-NIR": ["Northern Ireland(Moyle)"], + "GB->IE": ["Ireland(East-West)"], + "GB->NL": ["Netherlands(BritNed)"], + "GB->NO-NO2": ["North Sea Link (INTNSL)"], } +def query_elexon(url: str, session: Session, params: dict) -> list: + r: Response = session.get(url, params=params) + if not r.ok: + raise ParserException( + parser="ELEXON", + message=f"Error fetching data: {r.status_code} {r.reason}", + ) + return r.json() + + +def parse_datetime( + settlementDate: str | None, settlementPeriod: int | None +) -> datetime: + if settlementDate is None or settlementPeriod is None: + raise ParserException( + parser="ELEXON", + message="Error fetching data: settlementDate or settlementPeriod is None", + ) + parsed_datetime = datetime.strptime(settlementDate, "%Y-%m-%d") + parsed_datetime += timedelta(hours=(settlementPeriod - 1) / 2) + return parsed_datetime.replace(tzinfo=timezone.utc) + + +def query_production( + session: Session, target_datetime: datetime, logger: Logger +) -> ProductionBreakdownList: + """Fetches production data from the B1620 endpoint from the ELEXON API.""" + production_params = { + "publishDateTimeFrom": (target_datetime - timedelta(days=2)).strftime( + "%Y-%m-%d 00:00" + ), + "publishDateTimeTo": target_datetime.strftime("%Y-%m-%d %H:%M"), + } + production_data = query_elexon( + ELEXON_URLS["production"], session, production_params + ) + + parsed_events = parse_production(production_data, logger, "B1620") + return parsed_events + + +def get_event_value(event: dict[str, Any], key: str) -> float | None: + value = event.get(key) + if value is not None: + return float(value) + + +def parse_production( + production_data: list[dict[str, Any]], logger: Logger, dataset: str +) -> ProductionBreakdownList: + """Parses production events from the ELEXON API. This function is used for the B1620 data or the FUELHH data.""" + dataset_info = { + "B1620": { + "mode_mapping": RESOURCE_TYPE_TO_FUEL, + "mode_key": "psrType", + "quantity_key": "quantity", + }, + "FUELHH": { + "mode_mapping": FUEL_INST_MAPPING, + "mode_key": "fuelType", + "quantity_key": "generation", + }, + } + + mode_mapping = dataset_info[dataset]["mode_mapping"] + mode_key = dataset_info[dataset]["mode_key"] + quantity_key = dataset_info[dataset]["quantity_key"] + + all_production_breakdowns: list[ProductionBreakdownList] = [] + + for event in production_data: + production_breakdown = ProductionBreakdownList(logger=logger) + event_datetime = parse_datetime( + event.get("settlementDate"), event.get("settlementPeriod") + ) + production_mix = ProductionMix() + storage_mix = StorageMix() + + production_mode = mode_mapping[event.get(mode_key)] + + if production_mode == "exchange": + continue + + if production_mode == "hydro storage": + storage_value = get_event_value(event, quantity_key) + if storage_value: + storage_mix.add_value("hydro", -1 * storage_value) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) + else: + production_value = get_event_value(event, quantity_key) + if production_value: + production_mix.add_value( + production_mode, production_value, correct_negative_with_zero=True + ) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) + + all_production_breakdowns.append(production_breakdown) + events = ProductionBreakdownList.merge_production_breakdowns( + all_production_breakdowns, logger + ) + return events + + def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: """Get the ids of all historical_demand_data reports""" index = {} @@ -103,7 +220,7 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: ) data = response.json() pattern = re.compile(r"historic_demand_data_(?P\d+)") - for resource in data["resources"]: + for resource in data.get("result").get("resources"): match = pattern.match(resource["name"]) if match is not None: index[int(match["year"])] = resource["id"] @@ -112,9 +229,10 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: def query_additional_eso_data( target_datetime: datetime, session: Session -) -> list[dict]: - begin = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") - end = (target_datetime + timedelta(days=1)).strftime("%Y-%m-%d") +) -> list[dict[str, Any]]: + """Fetches embedded wind and solar and hydro storage data from the ESO API.""" + begin = (target_datetime - timedelta(days=2)).strftime("%Y-%m-%d") + end = (target_datetime).strftime("%Y-%m-%d") if target_datetime > (datetime.now(tz=timezone.utc) - timedelta(days=30)): report_id = ESO_DEMAND_DATA_UPDATE_ID else: @@ -124,337 +242,146 @@ def query_additional_eso_data( "sql": f'''SELECT * FROM "{report_id}" WHERE "SETTLEMENT_DATE" BETWEEN '{begin}' AND '{end}' ORDER BY "SETTLEMENT_DATE"''' } response = session.get(ESO_NATIONAL_GRID_ENDPOINT, params=params) - return response.json()["result"]["records"] - - -def query_ELEXON(report, session: Session, params): - params["APIKey"] = get_token("ELEXON_TOKEN") - return session.get(ELEXON_ENDPOINT.format(report), params=params) - - -def query_exchange(session: Session, target_datetime=None): - if target_datetime is None: - target_datetime = date.today() - - from_date = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") - to_date = target_datetime.strftime("%Y-%m-%d") - - params = {"FromDate": from_date, "ToDate": to_date, "ServiceType": "csv"} - response = query_ELEXON("INTERFUELHH", session, params) - return response.text + eso_data = response.json()["result"]["records"] + return eso_data + + +def parse_eso_production( + production_data: list[dict[str, Any]], logger: Logger +) -> ProductionBreakdownList: + all_production_breakdowns: list[ProductionBreakdownList] = [] + for event in production_data: + production_breakdown = ProductionBreakdownList(logger=logger) + event_datetime = parse_datetime( + event.get("SETTLEMENT_DATE"), event.get("SETTLEMENT_PERIOD") + ) + production_mix = ProductionMix() + storage_mix = StorageMix() + for production_mode in ESO_FUEL_MAPPING: + if ESO_FUEL_MAPPING[production_mode] == "hydro storage": + storage_value = get_event_value(event, production_mode) + if storage_value: + storage_mix.add_value("hydro", event.get(production_mode)) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) + else: + production_value = get_event_value(event, production_mode) + if production_value: + production_mix.add_value( + ESO_FUEL_MAPPING[production_mode], + event.get(production_mode), + correct_negative_with_zero=True, + ) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) + + all_production_breakdowns.append(production_breakdown) + events = ProductionBreakdownList.merge_production_breakdowns( + all_production_breakdowns, logger + ) + return events -def query_production( - session: Session, target_datetime: datetime | None = None, report: str = "B1620" -): - if target_datetime is None: - target_datetime = datetime.now() +def parse_eso_hydro_storage( + eso_data: list[dict[str, Any]], logger: Logger +) -> ProductionBreakdownList: + """Parses only hydro storage data from the ESO API. This data will be merged with the B1620 data""" + storage_breakdown = ProductionBreakdownList(logger=logger) + for event in eso_data: + event_datetime = parse_datetime( + event.get("SETTLEMENT_DATE"), event.get("SETTLEMENT_PERIOD") + ) + storage_mix = StorageMix() + storage_value = get_event_value(event, "PUMP_STORAGE_PUMPING") + if storage_value: + storage_mix.add_value("hydro", storage_value) + storage_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) + return storage_breakdown - # we can only fetch one date at a time. - # if target_datetime is first 30 minutes of the day fetch the day before. - # otherwise fetch the day of target_datetime. - if target_datetime.time() <= time(0, 30): - settlement_date = target_datetime.date() - timedelta(1) - else: - settlement_date = target_datetime.date() +def query_production_fuelhh( + session: Session, target_datetime: datetime, logger: Logger +) -> list[dict[str, Any]]: + """Fetches production data from the FUELHH endpoint. + This endpoint provides the half-hourly generation outturn (Generation By Fuel type) + to give our users an indication of the generation outturn for Great Britain. + The data is aggregated by Fuel Type category and updated at 30-minute intervals + with average MW values over 30 minutes for each category.""" params = { - "SettlementDate": settlement_date.strftime("%Y-%m-%d"), - "Period": "*", - "ServiceType": "csv", + "settlementDateFrom": (target_datetime - timedelta(days=1)).strftime( + "%Y-%m-%d" + ), + "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), + "format": "json", } - if report == "FUELINST": - params = { - "FromDateTime": (target_datetime - timedelta(days=1)) - .date() - .strftime("%Y-%m-%d %H:%M:%S"), - "ToDateTime": (target_datetime + timedelta(days=1)) - .date() - .strftime("%Y-%m-%d %H:%M:%S"), - "Period": "*", - "ServiceType": "csv", - } - response = query_ELEXON(report, session, params) - return response.text + fuelhh_data = query_elexon(ELEXON_URLS["production_fuelhh"], session, params) + return fuelhh_data -def parse_exchange( - zone_key1: str, - zone_key2: str, - csv_text: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), -): - if not csv_text: - return None - - report = REPORT_META["INTERFUELHH"] - - sorted_zone_keys = sorted([zone_key1, zone_key2]) - exchange = "->".join(sorted_zone_keys) - data_points = [] - lines = csv_text.split("\n") - - # check field count in report is as expected - field_count = len(lines[1].split(",")) - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in INTERFUELHH report, got {}".format( - report["expected_fields"], field_count - ) - ) - - for line in lines[1:-1]: - fields = line.split(",") - - # settlement date / period combinations are always local time - date = datetime.strptime(fields[1], "%Y%m%d").date() - settlement_period = int(fields[2]) - date_time = datetime_from_date_sp(date, settlement_period) - - data = { - "sortedZoneKeys": exchange, - "datetime": date_time, - "source": "bmreports.com", - } - # positive value implies import to GB - multiplier = -1 if "GB" in sorted_zone_keys[0] else 1 - net_flow = 0.0 # init - for column_index in EXCHANGES[exchange]: - # read out all columns providing values for this exchange - if fields[column_index] == "": - continue # no value provided for this exchange - net_flow += float(fields[column_index]) * multiplier - data["netFlow"] = net_flow - data_points.append(data) +def query_and_merge_production_fuelhh_and_eso( + session: Session, target_datetime: datetime, logger: Logger +) -> ProductionBreakdownList: + events_fuelhh = query_production_fuelhh(session, target_datetime, logger) + parsed_events_fuelhh = parse_production(events_fuelhh, logger, "FUELHH") + events_eso = query_additional_eso_data(target_datetime, session) + parsed_events_eso = parse_eso_production(events_eso, logger) - return data_points - - -def parse_production_FUELINST( - csv_data: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), -) -> pd.DataFrame: - """A temporary parser for the FUELINST report. - This report will be decomissioned sometime in 2023. - We use it as a replacement for B1620 that doesn't work - at the moment (19/12/2022). - """ - if not csv_data: - raise ParserException("ELEXON.py", "Production file is empty.") - report = REPORT_META["FUELINST"] - # create DataFrame from slice of CSV rows - df = pd.read_csv(StringIO(csv_data), skiprows=1, skipfooter=1, header=None) - # check field count in report is as expected - field_count = len(df.columns) - if field_count != report["expected_fields"]: - raise ParserException( - "ELEXON.py", - "Expected {} fields in FUELINST report, got {}".format( - report["expected_fields"], len(df.columns) - ), - ) - # The file doesn't have a column header, so we need to recreate it. - mapping = {1: "Settlement Date", 2: "Settlement Period", 3: "Spot Time"} - for index, fuel in enumerate(FUEL_INST_MAPPING.values()): - mapping[index + 4] = fuel - df = df.rename(columns=mapping) - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d") - ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, + merged_events = ProductionBreakdownList.merge_production_breakdowns( + [parsed_events_fuelhh, parsed_events_eso], logger, matching_timestamps_only=True ) - return df.set_index("datetime") + return merged_events -def parse_additional_eso_production(raw_data: list[dict]) -> pd.DataFrame: - """Parse additional eso data for embedded wind/solar and hydro storage.""" - df = pd.DataFrame.from_records(raw_data) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["SETTLEMENT_DATE"], x["SETTLEMENT_PERIOD"]), - axis=1, - ) - df = df.rename(columns=ESO_FUEL_MAPPING) - return df.set_index("datetime") - - -def process_production_events( - fuel_inst_data: pd.DataFrame, eso_data: pd.DataFrame -) -> list[dict]: - """Combine FUELINST report and ESO data together to get the full picture and to EM Format.""" - df = fuel_inst_data.join(eso_data, rsuffix="_eso") - df = df.rename(columns={"wind_eso": "wind", "solar_eso": "solar"}) - df = df.groupby(df.columns, axis=1).sum() - data_points = [] - for time_t in pd.unique(df.index): - time_df = df[df.index == time_t] - - data_point = { - "zoneKey": "GB", - "datetime": time_t.to_pydatetime(), - "source": "bmreports.com", - "production": {}, - "storage": {}, +def query_exchange( + zone_key: ZoneKey, session: Session, target_datetime: datetime, logger: Logger +) -> ExchangeList: + all_exchanges: list[ExchangeList] = [] + for interconnector in ZONEKEY_TO_INTERCONNECTOR[zone_key]: + exchange_params = { + "settlementDateFrom": (target_datetime - timedelta(days=2)).strftime( + "%Y-%m-%d" + ), + "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), + "interconnectorName": interconnector, + "format": "json", } - - for row in time_df.iterrows(): - electricity_production = row[1].to_dict() - for key in electricity_production: - if key in PRODUCTION_MODES: - data_point["production"][key] = electricity_production[key] - elif key == "hydro storage": - # According to National Grid Eso: - # The demand due to pumping at hydro pump storage units; the -ve signifies pumping load. - # We store the pump loading as a positive value and discharge as negative. - data_point["storage"]["hydro"] = -electricity_production[key] - - data_points.append(data_point) - return data_points - - -def parse_production( - csv_text: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), -): - if not csv_text: - return None - - report = REPORT_META["B1620"] - - # create DataFrame from slice of CSV rows - df = pd.read_csv(StringIO(csv_text), skiprows=report["skiprows"] - 1) - - # check field count in report is as expected - field_count = len(df.columns) - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in B1620 report, got {}".format( - report["expected_fields"], len(df.columns) + exchange_data = query_elexon( + ELEXON_URLS["exchange"], session, exchange_params + ).get("data") + + if not exchange_data: + raise ParserException( + parser="ELEXON.py", + message=f"No exchange data found for {target_datetime.date()}", ) - ) - - # filter out undesired columns - df = df.iloc[:-1, [7, 8, 9, 4]] - - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(x, "%Y-%m-%d") - ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, - ) - - # map from report fuel names to electricitymap fuel names - fuel_column = "Power System Resource Type" - df[fuel_column] = df[fuel_column].apply(lambda x: RESOURCE_TYPE_TO_FUEL[x]) - - # loop through unique datetimes and create each data point - data_points = [] - for time_t in pd.unique(df["datetime"]): - time_df = df[df["datetime"] == time_t] - - data_point = { - "zoneKey": "GB", - "datetime": time_t.to_pydatetime(), - "source": "bmreports.com", - "production": {}, - "storage": {}, - } - - for row in time_df.iterrows(): - fields = row[1].to_dict() - fuel = fields[fuel_column] - quantity = fields["Quantity"] - - # check if storage value and if so correct key - if "storage" in fuel: - fuel_key = fuel.replace("storage", "").strip() - # ELEXON storage is negative when storing and positive when - # discharging (the opposite to electricitymap) - data_point["storage"][fuel_key] = quantity * -1 - else: - # if/else structure allows summation of multiple quantities - # e.g. 'Wind Onshore' and 'Wind Offshore' both have the - # key 'wind' here. - if fuel in data_point["production"]: - data_point["production"][fuel] += quantity - else: - data_point["production"][fuel] = quantity - - data_points.append(data_point) - - return data_points - - -def datetime_from_date_sp(date, sp): - datetime = arrow.get(date).shift(minutes=30 * (sp - 1)) - return datetime.replace(tzinfo="Europe/London").datetime - - -def _fetch_wind( - target_datetime: datetime | None = None, logger: Logger = getLogger(__name__) -): - if target_datetime is None: - target_datetime = datetime.now() - - # line up with B1620 (main production report) search range - d = target_datetime.date() - start = d - timedelta(hours=48) - end = datetime.combine(d + timedelta(days=1), time(0)) - - session = Session() - params = { - "FromDateTime": start.strftime("%Y-%m-%d %H:%M:%S"), - "ToDateTime": end.strftime("%Y-%m-%d %H:%M:%S"), - "ServiceType": "csv", - } - response = query_ELEXON("FUELINST", session, params) - csv_text = response.text - - NO_DATA_TXT_ANSWER = "204No Content" - if NO_DATA_TXT_ANSWER in csv_text: - logger.warning(f"Impossible to fetch wind data for {target_datetime}") - return pd.DataFrame(columns=["datetime", "Wind"]) - - report = REPORT_META["FUELINST"] - df = pd.read_csv( - StringIO(csv_text), skiprows=report["skiprows"], skipfooter=1, header=None - ) - - field_count = len(df.columns) - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in FUELINST report, got {}".format( - report["expected_fields"], len(df.columns) + for event in exchange_data: + exchange_list = ExchangeList(logger) + event_datetime = parse_datetime( + event.get("settlementDate"), event.get("settlementPeriod") ) - ) - df = df.iloc[:, [1, 2, 3, 8]] - df.columns = ["Settlement Date", "Settlement Period", "published", "Wind"] - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d") - ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, - ) - - df["published"] = df["published"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d%H%M%S") - ) - # get the most recently published value for each datetime - idx = df.groupby("datetime")["published"].transform(max) == df["published"] - df = df[idx] - - return df[["datetime", "Wind"]] + exchange_list.append( + zoneKey=zone_key, + netFlow=event.get("generation"), + source=ELEXON_SOURCE, + datetime=event_datetime, + ) + all_exchanges.append(exchange_list) + return ExchangeList.merge_exchanges(all_exchanges, logger) @refetch_frequency(timedelta(days=1)) @@ -466,80 +393,64 @@ def fetch_exchange( logger: Logger = getLogger(__name__), ): session = session or Session() - try: - target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError as e: - raise ValueError(f"Invalid target_datetime: {target_datetime}") from e - response = query_exchange(session, target_datetime) - data = parse_exchange(zone_key1, zone_key2, response, target_datetime, logger) - return data + if target_datetime is None: + target_datetime = datetime.now(tz=timezone.utc) + + exchangeKey = ZoneKey("->".join(sorted([zone_key1, zone_key2]))) + if target_datetime < ELEXON_START_DATE: + raise ParserException( + parser="ELEXON.py", + message=f"Production data is not available before {ELEXON_START_DATE.date()}", + ) + exchange_data = query_exchange(exchangeKey, session, target_datetime, logger) + + return exchange_data.to_list() -# While using the FUELINST report we can increase the refetch frequency. @refetch_frequency(timedelta(days=2)) def fetch_production( - zone_key: str = "GB", + zone_key: ZoneKey = ZoneKey("GB"), session: Session | None = None, target_datetime: datetime | None = None, logger: Logger = getLogger(__name__), ) -> list[dict]: session = session or Session() - try: - target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError as e: - raise ValueError(f"Invalid target_datetime: {target_datetime}") from e - # TODO currently resorting to FUELINST as B1620 reports 0 production in most production - # modes at the moment. (16/12/2022) FUELINST will be decomissioned in 2023, so we should - # switch back to B1620 at some point. - response = query_production(session, target_datetime, "FUELINST") - fuel_inst_data = parse_production_FUELINST(response, target_datetime, logger) - raw_additional_data = query_additional_eso_data(target_datetime, session) - additional_data = parse_additional_eso_production(raw_additional_data) - data = process_production_events(fuel_inst_data, additional_data) - # We are fetching from FUELINST directly. - if False: - # At times B1620 has had poor quality data for wind so fetch from FUELINST - # But that source is unavailable prior to cutout date - HISTORICAL_WIND_CUTOUT = "2016-03-01" - FETCH_WIND_FROM_FUELINST = True - if target_datetime < arrow.get(HISTORICAL_WIND_CUTOUT).datetime: - FETCH_WIND_FROM_FUELINST = False - if FETCH_WIND_FROM_FUELINST: - wind = _fetch_wind(target_datetime, logger=logger) - for entry in data: - datetime = entry["datetime"] - wind_row = wind[wind["datetime"] == datetime] - if len(wind_row): - entry["production"]["wind"] = wind_row.iloc[0]["Wind"] - else: - entry["production"]["wind"] = None - - expected_range = { - # Historical data might be above the current capacity for coal - "coal": (0, 20000), - "gas": (100, 60000), - "nuclear": (100, 56000), - "wind": (0, 600000), - } - data = [x for x in data if validate(x, logger, expected_range=expected_range)] - - return data - - -if __name__ == "__main__": - """Main method, never used by the Electricity Map backend, but handy for testing.""" - - print("fetch_production() ->") - print(fetch_production()) - - print("fetch_exchange(FR, GB) ->") - print(fetch_exchange("FR", "GB")) + if target_datetime is None: + target_datetime = datetime.now(tz=timezone.utc) + else: + target_datetime = target_datetime.astimezone(timezone.utc) - print("fetch_exchange(GB, IE) ->") - print(fetch_exchange("GB", "IE")) + if target_datetime < ELEXON_START_DATE: + raise ParserException( + parser="ELEXON.py", + message=f"Production data is not available before {ELEXON_START_DATE.date()}", + ) - print("fetch_exchange(GB, NL) ->") - print(fetch_exchange("GB", "NL")) + data_b1620 = query_production(session, target_datetime, logger) - print("fetch_exchange(GB, DK-DK1) ->") - print(fetch_exchange("DK-DK1", "GB")) + if not validate_bmrs_data(data_b1620): + data = query_and_merge_production_fuelhh_and_eso( + session, target_datetime, logger + ) + else: + # add hydro pumping data from ESO (B1620 only includes pumped storage production (injected on the grid) and not the pumping (withdrawn from the grid) + eso_data = query_additional_eso_data(target_datetime, session) + parsed_hydro_storage_data = parse_eso_hydro_storage(eso_data, logger) + data = ProductionBreakdownList.merge_production_breakdowns( + [data_b1620, parsed_hydro_storage_data], + logger, + matching_timestamps_only=True, + ) + return data.to_list() + + +def validate_bmrs_data(data: ProductionBreakdownList): + """Check if the PowerProductionBreakdown event contains a full power breakdown or just wind and solar or if data is missing.""" + if not data: + return False + available_production_modes = [] + for event in data.to_list(): + available_production_modes += [*event["production"].keys()] + if "gas" not in set(available_production_modes): + return False + return True