From 7e1412eb85ed9ab5b1ed87ad529c3b075dda9864 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Wed, 31 Jan 2024 16:56:21 +0100 Subject: [PATCH 01/11] elexon parser --- parsers/ELEXON.py | 245 ++++++++++++++++++++++++++++------------------ 1 file changed, 152 insertions(+), 93 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 7982c662c9..c9166524f3 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -17,16 +17,24 @@ import arrow import pandas as pd -from requests import Session +from requests import Response, Session from electricitymap.contrib.config.constants import PRODUCTION_MODES +from electricitymap.contrib.lib.models.event_lists import ( + ExchangeList, + ProductionBreakdownList, +) +from electricitymap.contrib.lib.models.events import ProductionMix, StorageMix +from electricitymap.contrib.lib.types import ZoneKey from parsers.lib.config import refetch_frequency from parsers.lib.exceptions import ParserException from parsers.lib.utils import get_token from parsers.lib.validation import validate ELEXON_ENDPOINT = "https://api.bmreports.com/BMRS/{}/v1" - +ELEXON_URLS = {"production":"https://data.elexon.co.uk/bmrs/api/v1/datasets/AGPT/stream", + "exchange": "https://data.elexon.co.uk/bmrs/api/v1/generation/outturn/interconnectors"} +ELEXON_SOURCE = "elexon.co.uk" ESO_NATIONAL_GRID_ENDPOINT = ( "https://api.nationalgrideso.com/api/3/action/datastore_search_sql" ) @@ -94,6 +102,68 @@ "DK-DK1->GB": [11], # Viking Link } +ZONEKEY_TO_INTERCONNECTOR = {'BE->GB': ['Belgium (Nemolink)'], + 'DK-DK1->GB': ['Denmark (Viking link)'], + 'FR->GB':['Eleclink (INTELEC)', 'France(IFA)', 'IFA2 (INTIFA2)'], + 'GB->GB-NIR': ['Northern Ireland(Moyle)'], + 'GB->IE': ['Ireland(East-West)'], + 'GB->NL': ['Netherlands(BritNed)'], + 'GB->NO-NO2': ['North Sea Link (INTNSL)']} + +def query_elexon(url:str, session:Session, params: dict) -> list: + r: Response = session.get(url, params=params) + if not r.ok: + raise ParserException( + parser="ELEXON", + message=f"Error fetching data: {r.status_code} {r.reason}", + ) + return r.json() + +def parse_datetime(settlementDate: str, settlementPeriod: int) -> datetime: + parsed_datetime = datetime.strptime(settlementDate, "%Y-%m-%d") + parsed_datetime += timedelta(hours=(settlementPeriod - 1)/2) + return parsed_datetime.astimezone(timezone.utc) + +def query_production(session: Session, target_datetime: datetime, logger:Logger) -> list: + production_params = {"publishDateTimeFrom": (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d"), "publishDateTimeTo": target_datetime.strftime("%Y-%m-%d")} + production_data = query_elexon(ELEXON_URLS["production"], session, production_params) + all_production_breakdowns : list[ProductionBreakdownList] = [] + for event in production_data: + production_breakdown= ProductionBreakdownList(logger=logger) + event_datetime = parse_datetime(event.get("settlementDate"), event.get("settlementPeriod")) + production_mix = ProductionMix() + storage_mix = StorageMix() + production_mode = RESOURCE_TYPE_TO_FUEL[event.get("psrType")] + + if production_mode == "hydro storage": + storage_mix.add_value("hydro", event.get("quantity")) + production_breakdown.append(zoneKey=ZoneKey("GB"), storage=storage_mix, source=ELEXON_SOURCE, datetime=event_datetime) + else: + production_mix.add_value(production_mode, event.get("quantity")) + production_breakdown.append(zoneKey=ZoneKey("GB"), production=production_mix, source=ELEXON_SOURCE, datetime=event_datetime) + + all_production_breakdowns.append(production_breakdown) + events = ProductionBreakdownList.merge_production_breakdowns( + all_production_breakdowns, logger + ) + + return events.to_list() + +def query_exchange(zone_key:ZoneKey, session: Session, target_datetime: datetime, logger:Logger) -> list: + all_exchanges: list[ExchangeList] = [] + for interconnector in ZONEKEY_TO_INTERCONNECTOR[zone_key]: + exchange_params = {"settlementDateFrom": (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d"), "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), "interconnectorName":interconnector, "format":"json"} + exchange_data = query_elexon(ELEXON_URLS["exchange"], session, exchange_params).get("data") + if not exchange_data: + raise ParserException(parser="ELEXON.py", message=f"No exchange data found for {target_datetime.date()}") + for event in exchange_data: + exchange_list = ExchangeList(logger) + event_datetime = parse_datetime(event.get("settlementDate"), event.get("settlementPeriod")) + + exchange_list.append(zoneKey=zone_key, netFlow=event.get("generation"), source=ELEXON_SOURCE, datetime=event_datetime) + all_exchanges.append(exchange_list) + events = ExchangeList.merge_exchanges(all_exchanges, logger) + return events.to_list() def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: """Get the ids of all historical_demand_data reports""" @@ -129,53 +199,55 @@ def query_additional_eso_data( def query_ELEXON(report, session: Session, params): params["APIKey"] = get_token("ELEXON_TOKEN") + breakpoint() return session.get(ELEXON_ENDPOINT.format(report), params=params) -def query_exchange(session: Session, target_datetime=None): - if target_datetime is None: - target_datetime = date.today() - - from_date = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") - to_date = target_datetime.strftime("%Y-%m-%d") - - params = {"FromDate": from_date, "ToDate": to_date, "ServiceType": "csv"} - response = query_ELEXON("INTERFUELHH", session, params) - return response.text - - -def query_production( - session: Session, target_datetime: datetime | None = None, report: str = "B1620" -): - if target_datetime is None: - target_datetime = datetime.now() - - # we can only fetch one date at a time. - # if target_datetime is first 30 minutes of the day fetch the day before. - # otherwise fetch the day of target_datetime. - if target_datetime.time() <= time(0, 30): - settlement_date = target_datetime.date() - timedelta(1) - else: - settlement_date = target_datetime.date() - - params = { - "SettlementDate": settlement_date.strftime("%Y-%m-%d"), - "Period": "*", - "ServiceType": "csv", - } - if report == "FUELINST": - params = { - "FromDateTime": (target_datetime - timedelta(days=1)) - .date() - .strftime("%Y-%m-%d %H:%M:%S"), - "ToDateTime": (target_datetime + timedelta(days=1)) - .date() - .strftime("%Y-%m-%d %H:%M:%S"), - "Period": "*", - "ServiceType": "csv", - } - response = query_ELEXON(report, session, params) - return response.text +# def query_exchange(session: Session, target_datetime=None): +# if target_datetime is None: +# target_datetime = date.today() + +# from_date = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") +# to_date = target_datetime.strftime("%Y-%m-%d") + +# params = {"FromDate": from_date, "ToDate": to_date, "ServiceType": "csv"} +# response = query_ELEXON("INTERFUELHH", session, params) +# return response.text + + +# def query_production( +# session: Session, target_datetime: datetime | None = None, report: str = "B1620" +# ): +# if target_datetime is None: +# target_datetime = datetime.now() + +# # we can only fetch one date at a time. +# # if target_datetime is first 30 minutes of the day fetch the day before. +# # otherwise fetch the day of target_datetime. +# if target_datetime.time() <= time(0, 30): +# settlement_date = target_datetime.date() - timedelta(1) +# else: +# settlement_date = target_datetime.date() + +# params = { +# "SettlementDate": settlement_date.strftime("%Y-%m-%d"), +# "Period": "*", +# "format": "json", +# } +# if report == "FUELINST": +# params = { +# "FromDateTime": (target_datetime - timedelta(days=1)) +# .date() +# .strftime("%Y-%m-%d %H:%M:%S"), +# "ToDateTime": (target_datetime + timedelta(days=1)) +# .date() +# .strftime("%Y-%m-%d %H:%M:%S"), +# "Period": "*", +# "ServiceType": "csv", +# } +# response = query_ELEXON(report, session, params) +# breakpoint() +# return response.text def parse_exchange( @@ -332,6 +404,7 @@ def parse_production( # check field count in report is as expected field_count = len(df.columns) + breakpoint() if field_count != report["expected_fields"]: raise ValueError( "Expected {} fields in B1620 report, got {}".format( @@ -354,12 +427,12 @@ def parse_production( # map from report fuel names to electricitymap fuel names fuel_column = "Power System Resource Type" df[fuel_column] = df[fuel_column].apply(lambda x: RESOURCE_TYPE_TO_FUEL[x]) - + breakpoint() # loop through unique datetimes and create each data point - data_points = list() + all_production_breakdowns = ProductionBreakdownList(logger=logger) for time_t in pd.unique(df["datetime"]): time_df = df[df["datetime"] == time_t] - + data_point = ProductionMix() data_point = { "zoneKey": "GB", "datetime": time_t.to_pydatetime(), @@ -367,7 +440,7 @@ def parse_production( "production": dict(), "storage": dict(), } - + breakpoint() for row in time_df.iterrows(): fields = row[1].to_dict() fuel = fields[fuel_column] @@ -388,9 +461,11 @@ def parse_production( else: data_point["production"][fuel] = quantity - data_points.append(data_point) + all_production_breakdowns.append(zoneKey=ZoneKey("GB"), production=data_point) - return data_points + + + return all_production_breakdowns.to_list() def datetime_from_date_sp(date, sp): @@ -468,8 +543,8 @@ def fetch_exchange( session = session or Session() try: target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError as e: - raise ValueError(f"Invalid target_datetime: {target_datetime}") from e + except arrow.parser.ParserError: + raise ValueError(f"Invalid target_datetime: {target_datetime}") response = query_exchange(session, target_datetime) data = parse_exchange(zone_key1, zone_key2, response, target_datetime, logger) return data @@ -486,33 +561,30 @@ def fetch_production( session = session or Session() try: target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError as e: - raise ValueError(f"Invalid target_datetime: {target_datetime}") from e + except arrow.parser.ParserError: + raise ValueError(f"Invalid target_datetime: {target_datetime}") # TODO currently resorting to FUELINST as B1620 reports 0 production in most production # modes at the moment. (16/12/2022) FUELINST will be decomissioned in 2023, so we should # switch back to B1620 at some point. - response = query_production(session, target_datetime, "FUELINST") - fuel_inst_data = parse_production_FUELINST(response, target_datetime, logger) - raw_additional_data = query_additional_eso_data(target_datetime, session) - additional_data = parse_additional_eso_production(raw_additional_data) - data = process_production_events(fuel_inst_data, additional_data) - # We are fetching from FUELINST directly. - if False: - # At times B1620 has had poor quality data for wind so fetch from FUELINST - # But that source is unavailable prior to cutout date - HISTORICAL_WIND_CUTOUT = "2016-03-01" - FETCH_WIND_FROM_FUELINST = True - if target_datetime < arrow.get(HISTORICAL_WIND_CUTOUT).datetime: - FETCH_WIND_FROM_FUELINST = False - if FETCH_WIND_FROM_FUELINST: - wind = _fetch_wind(target_datetime, logger=logger) - for entry in data: - datetime = entry["datetime"] - wind_row = wind[wind["datetime"] == datetime] - if len(wind_row): - entry["production"]["wind"] = wind_row.iloc[0]["Wind"] - else: - entry["production"]["wind"] = None + response = query_production(session, target_datetime, "B1620") + breakpoint() + data= parse_production(response, target_datetime, logger) + + # At times B1620 has had poor quality data for wind so fetch from FUELINST + # But that source is unavailable prior to cutout date + HISTORICAL_WIND_CUTOUT = "2016-03-01" + FETCH_WIND_FROM_FUELINST = True + if target_datetime < arrow.get(HISTORICAL_WIND_CUTOUT).datetime: + FETCH_WIND_FROM_FUELINST = False + if FETCH_WIND_FROM_FUELINST: + wind = _fetch_wind(target_datetime, logger=logger) + for entry in data: + datetime = entry["datetime"] + wind_row = wind[wind["datetime"] == datetime] + if len(wind_row): + entry["production"]["wind"] = wind_row.iloc[0]["Wind"] + else: + entry["production"]["wind"] = None required = ["coal", "gas", "nuclear", "wind"] expected_range = { @@ -532,19 +604,6 @@ def fetch_production( if __name__ == "__main__": - """Main method, never used by the Electricity Map backend, but handy for testing.""" - - print("fetch_production() ->") - print(fetch_production()) - - print("fetch_exchange(FR, GB) ->") - print(fetch_exchange("FR", "GB")) - - print("fetch_exchange(GB, IE) ->") - print(fetch_exchange("GB", "IE")) - - print("fetch_exchange(GB, NL) ->") - print(fetch_exchange("GB", "NL")) - print("fetch_exchange(GB, DK-DK1) ->") - print(fetch_exchange("DK-DK1", "GB")) + data = query_exchange("FR->GB",Session(), datetime(2024, 1, 30),logger=getLogger(__name__)) + print(data[-1]) \ No newline at end of file From edf44c8fa408cf930eceed9baebc265df1250242 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Thu, 1 Feb 2024 16:01:46 +0100 Subject: [PATCH 02/11] elexon revamp --- parsers/ELEXON.py | 639 ++++++++++++++++------------------------------ 1 file changed, 217 insertions(+), 422 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index c9166524f3..68019b6559 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -11,15 +11,11 @@ """ import re -from datetime import date, datetime, time, timedelta, timezone -from io import StringIO +from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -import arrow -import pandas as pd from requests import Response, Session -from electricitymap.contrib.config.constants import PRODUCTION_MODES from electricitymap.contrib.lib.models.event_lists import ( ExchangeList, ProductionBreakdownList, @@ -28,12 +24,17 @@ from electricitymap.contrib.lib.types import ZoneKey from parsers.lib.config import refetch_frequency from parsers.lib.exceptions import ParserException -from parsers.lib.utils import get_token from parsers.lib.validation import validate -ELEXON_ENDPOINT = "https://api.bmreports.com/BMRS/{}/v1" -ELEXON_URLS = {"production":"https://data.elexon.co.uk/bmrs/api/v1/datasets/AGPT/stream", - "exchange": "https://data.elexon.co.uk/bmrs/api/v1/generation/outturn/interconnectors"} +ELEXON_API_ENDPOINT = "https://data.elexon.co.uk/bmrs/api/v1" +ELEXON_URLS = { + "production": "/".join((ELEXON_API_ENDPOINT, "datasets/AGPT/stream")), + "production_fuelhh": "/".join((ELEXON_API_ENDPOINT, "datasets/FUELHH/stream")), + "exchange": "/".join((ELEXON_API_ENDPOINT, "generation/outturn/interconnectors")), +} +ELEXON_START_DATE = datetime( + 2019, 1, 1, tzinfo=timezone.utc +) # ELEXON API only has data from 2019-01-01 ELEXON_SOURCE = "elexon.co.uk" ESO_NATIONAL_GRID_ENDPOINT = ( "https://api.nationalgrideso.com/api/3/action/datastore_search_sql" @@ -42,12 +43,6 @@ # A specific report to query most recent data (within 1 month time span + forecast ahead) ESO_DEMAND_DATA_UPDATE_ID = "177f6fa4-ae49-4182-81ea-0c6b35f26ca6" -REPORT_META = { - "B1620": {"expected_fields": 13, "skiprows": 5}, - "FUELINST": {"expected_fields": 23, "skiprows": 1}, - "INTERFUELHH": {"expected_fields": 12, "skiprows": 0}, -} - # 'hydro' key is for hydro production # 'hydro storage' key is for hydro storage RESOURCE_TYPE_TO_FUEL = { @@ -92,25 +87,18 @@ "PUMP_STORAGE_PUMPING": "hydro storage", } -EXCHANGES = { - "FR->GB": [3, 8, 9], # IFA, Eleclink, IFA2 - "GB->GB-NIR": [4], - "GB->NL": [5], - "GB->IE": [6], - "BE->GB": [7], - "GB->NO-NO2": [10], # North Sea Link - "DK-DK1->GB": [11], # Viking Link +ZONEKEY_TO_INTERCONNECTOR = { + "BE->GB": ["Belgium (Nemolink)"], + "DK-DK1->GB": ["Denmark (Viking link)"], + "FR->GB": ["Eleclink (INTELEC)", "France(IFA)", "IFA2 (INTIFA2)"], + "GB->GB-NIR": ["Northern Ireland(Moyle)"], + "GB->IE": ["Ireland(East-West)"], + "GB->NL": ["Netherlands(BritNed)"], + "GB->NO-NO2": ["North Sea Link (INTNSL)"], } -ZONEKEY_TO_INTERCONNECTOR = {'BE->GB': ['Belgium (Nemolink)'], - 'DK-DK1->GB': ['Denmark (Viking link)'], - 'FR->GB':['Eleclink (INTELEC)', 'France(IFA)', 'IFA2 (INTIFA2)'], - 'GB->GB-NIR': ['Northern Ireland(Moyle)'], - 'GB->IE': ['Ireland(East-West)'], - 'GB->NL': ['Netherlands(BritNed)'], - 'GB->NO-NO2': ['North Sea Link (INTNSL)']} -def query_elexon(url:str, session:Session, params: dict) -> list: +def query_elexon(url: str, session: Session, params: dict) -> list: r: Response = session.get(url, params=params) if not r.ok: raise ParserException( @@ -119,51 +107,90 @@ def query_elexon(url:str, session:Session, params: dict) -> list: ) return r.json() + def parse_datetime(settlementDate: str, settlementPeriod: int) -> datetime: parsed_datetime = datetime.strptime(settlementDate, "%Y-%m-%d") - parsed_datetime += timedelta(hours=(settlementPeriod - 1)/2) - return parsed_datetime.astimezone(timezone.utc) + parsed_datetime += timedelta(hours=(settlementPeriod - 1) / 2) + return parsed_datetime.replace(tzinfo=timezone.utc) + + +def query_production( + session: Session, target_datetime: datetime, logger: Logger +) -> list: + """Fetches production data from the B1620 endpoint from the ELEXON API.""" + production_params = { + "publishDateTimeFrom": (target_datetime - timedelta(days=2)).strftime( + "%Y-%m-%d %H:%M" + ), + "publishDateTimeTo": target_datetime.strftime("%Y-%m-%d %H:%M"), + } + production_data = query_elexon( + ELEXON_URLS["production"], session, production_params + ) + + parsed_events = parse_production(production_data, logger, "B1620") + return parsed_events.to_list() + + +def parse_production( + production_data: list[dict[str, any]], logger: Logger, dataset: str +) -> ProductionBreakdownList: + """Parses production events from the ELEXON API. This function is used for the B1620 data or the FUELHH data.""" + dataset_info = { + "B1620": { + "mode_mapping": RESOURCE_TYPE_TO_FUEL, + "mode_key": "psrType", + "quantity_key": "quantity", + }, + "FUELHH": { + "mode_mapping": FUEL_INST_MAPPING, + "mode_key": "fuelType", + "quantity_key": "generation", + }, + } + + mode_mapping = dataset_info[dataset]["mode_mapping"] + mode_key = dataset_info[dataset]["mode_key"] + quantity_key = dataset_info[dataset]["quantity_key"] + + all_production_breakdowns: list[ProductionBreakdownList] = [] -def query_production(session: Session, target_datetime: datetime, logger:Logger) -> list: - production_params = {"publishDateTimeFrom": (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d"), "publishDateTimeTo": target_datetime.strftime("%Y-%m-%d")} - production_data = query_elexon(ELEXON_URLS["production"], session, production_params) - all_production_breakdowns : list[ProductionBreakdownList] = [] for event in production_data: - production_breakdown= ProductionBreakdownList(logger=logger) - event_datetime = parse_datetime(event.get("settlementDate"), event.get("settlementPeriod")) + production_breakdown = ProductionBreakdownList(logger=logger) + event_datetime = parse_datetime( + event.get("settlementDate"), event.get("settlementPeriod") + ) production_mix = ProductionMix() storage_mix = StorageMix() - production_mode = RESOURCE_TYPE_TO_FUEL[event.get("psrType")] + + production_mode = mode_mapping[event.get(mode_key)] + + if production_mode == "exchange": + continue if production_mode == "hydro storage": - storage_mix.add_value("hydro", event.get("quantity")) - production_breakdown.append(zoneKey=ZoneKey("GB"), storage=storage_mix, source=ELEXON_SOURCE, datetime=event_datetime) + storage_mix.add_value("hydro", event.get(quantity_key)) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) else: - production_mix.add_value(production_mode, event.get("quantity")) - production_breakdown.append(zoneKey=ZoneKey("GB"), production=production_mix, source=ELEXON_SOURCE, datetime=event_datetime) + production_mix.add_value(production_mode, event.get(quantity_key)) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) all_production_breakdowns.append(production_breakdown) events = ProductionBreakdownList.merge_production_breakdowns( all_production_breakdowns, logger ) + return events - return events.to_list() - -def query_exchange(zone_key:ZoneKey, session: Session, target_datetime: datetime, logger:Logger) -> list: - all_exchanges: list[ExchangeList] = [] - for interconnector in ZONEKEY_TO_INTERCONNECTOR[zone_key]: - exchange_params = {"settlementDateFrom": (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d"), "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), "interconnectorName":interconnector, "format":"json"} - exchange_data = query_elexon(ELEXON_URLS["exchange"], session, exchange_params).get("data") - if not exchange_data: - raise ParserException(parser="ELEXON.py", message=f"No exchange data found for {target_datetime.date()}") - for event in exchange_data: - exchange_list = ExchangeList(logger) - event_datetime = parse_datetime(event.get("settlementDate"), event.get("settlementPeriod")) - - exchange_list.append(zoneKey=zone_key, netFlow=event.get("generation"), source=ELEXON_SOURCE, datetime=event_datetime) - all_exchanges.append(exchange_list) - events = ExchangeList.merge_exchanges(all_exchanges, logger) - return events.to_list() def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: """Get the ids of all historical_demand_data reports""" @@ -173,7 +200,7 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: ) data = response.json() pattern = re.compile(r"historic_demand_data_(?P\d+)") - for resource in data["resources"]: + for resource in data.get("result").get("resources"): match = pattern.match(resource["name"]) if match is not None: index[int(match["year"])] = resource["id"] @@ -181,8 +208,9 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: def query_additional_eso_data( - target_datetime: datetime, session: Session -) -> list[dict]: + target_datetime: datetime, session: Session, logger: Logger +) -> ProductionBreakdownList: + """Fetches embedded wind and solar and hydro storage data from the ESO API.""" begin = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") end = (target_datetime + timedelta(days=1)).strftime("%Y-%m-%d") if target_datetime > (datetime.now(tz=timezone.utc) - timedelta(days=30)): @@ -194,342 +222,121 @@ def query_additional_eso_data( "sql": f'''SELECT * FROM "{report_id}" WHERE "SETTLEMENT_DATE" BETWEEN '{begin}' AND '{end}' ORDER BY "SETTLEMENT_DATE"''' } response = session.get(ESO_NATIONAL_GRID_ENDPOINT, params=params) - return response.json()["result"]["records"] - - -def query_ELEXON(report, session: Session, params): - params["APIKey"] = get_token("ELEXON_TOKEN") - breakpoint() - return session.get(ELEXON_ENDPOINT.format(report), params=params) - - -# def query_exchange(session: Session, target_datetime=None): -# if target_datetime is None: -# target_datetime = date.today() - -# from_date = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") -# to_date = target_datetime.strftime("%Y-%m-%d") - -# params = {"FromDate": from_date, "ToDate": to_date, "ServiceType": "csv"} -# response = query_ELEXON("INTERFUELHH", session, params) -# return response.text - - -# def query_production( -# session: Session, target_datetime: datetime | None = None, report: str = "B1620" -# ): -# if target_datetime is None: -# target_datetime = datetime.now() - -# # we can only fetch one date at a time. -# # if target_datetime is first 30 minutes of the day fetch the day before. -# # otherwise fetch the day of target_datetime. -# if target_datetime.time() <= time(0, 30): -# settlement_date = target_datetime.date() - timedelta(1) -# else: -# settlement_date = target_datetime.date() - -# params = { -# "SettlementDate": settlement_date.strftime("%Y-%m-%d"), -# "Period": "*", -# "format": "json", -# } -# if report == "FUELINST": -# params = { -# "FromDateTime": (target_datetime - timedelta(days=1)) -# .date() -# .strftime("%Y-%m-%d %H:%M:%S"), -# "ToDateTime": (target_datetime + timedelta(days=1)) -# .date() -# .strftime("%Y-%m-%d %H:%M:%S"), -# "Period": "*", -# "ServiceType": "csv", -# } -# response = query_ELEXON(report, session, params) -# breakpoint() -# return response.text - - -def parse_exchange( - zone_key1: str, - zone_key2: str, - csv_text: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), -): - if not csv_text: - return None - - report = REPORT_META["INTERFUELHH"] - - sorted_zone_keys = sorted([zone_key1, zone_key2]) - exchange = "->".join(sorted_zone_keys) - data_points = list() - lines = csv_text.split("\n") - - # check field count in report is as expected - field_count = len(lines[1].split(",")) - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in INTERFUELHH report, got {}".format( - report["expected_fields"], field_count - ) - ) - - for line in lines[1:-1]: - fields = line.split(",") - - # settlement date / period combinations are always local time - date = datetime.strptime(fields[1], "%Y%m%d").date() - settlement_period = int(fields[2]) - date_time = datetime_from_date_sp(date, settlement_period) - - data = { - "sortedZoneKeys": exchange, - "datetime": date_time, - "source": "bmreports.com", - } + eso_data = response.json()["result"]["records"] - # positive value implies import to GB - multiplier = -1 if "GB" in sorted_zone_keys[0] else 1 - net_flow = 0.0 # init - for column_index in EXCHANGES[exchange]: - # read out all columns providing values for this exchange - if fields[column_index] == "": - continue # no value provided for this exchange - net_flow += float(fields[column_index]) * multiplier - data["netFlow"] = net_flow - data_points.append(data) + parsed_events = parse_eso_production(eso_data, logger) - return data_points + return parsed_events -def parse_production_FUELINST( - csv_data: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), -) -> pd.DataFrame: - """A temporary parser for the FUELINST report. - This report will be decomissioned sometime in 2023. - We use it as a replacement for B1620 that doesn't work - at the moment (19/12/2022). - """ - if not csv_data: - raise ParserException("ELEXON.py", "Production file is empty.") - report = REPORT_META["FUELINST"] - # create DataFrame from slice of CSV rows - df = pd.read_csv(StringIO(csv_data), skiprows=1, skipfooter=1, header=None) - # check field count in report is as expected - field_count = len(df.columns) - if field_count != report["expected_fields"]: - raise ParserException( - "ELEXON.py", - "Expected {} fields in FUELINST report, got {}".format( - report["expected_fields"], len(df.columns) - ), +def parse_eso_production( + production_data: list[dict[str, any]], logger: Logger +) -> ProductionBreakdownList: + all_production_breakdowns: list[ProductionBreakdownList] = [] + for event in production_data: + production_breakdown = ProductionBreakdownList(logger=logger) + event_datetime = parse_datetime( + event.get("SETTLEMENT_DATE"), event.get("SETTLEMENT_PERIOD") ) - # The file doesn't have a column header, so we need to recreate it. - mapping = {1: "Settlement Date", 2: "Settlement Period", 3: "Spot Time"} - for index, fuel in enumerate(FUEL_INST_MAPPING.values()): - mapping[index + 4] = fuel - df = df.rename(columns=mapping) - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d") - ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, - ) - return df.set_index("datetime") - + production_mix = ProductionMix() + storage_mix = StorageMix() + for production_mode in ESO_FUEL_MAPPING.keys(): + if ESO_FUEL_MAPPING[production_mode] == "hydro storage": + storage_mix.add_value("hydro", event.get(production_mode)) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) + else: + production_mix.add_value( + ESO_FUEL_MAPPING[production_mode], event.get(production_mode) + ) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) -def parse_additional_eso_production(raw_data: list[dict]) -> pd.DataFrame: - """Parse additional eso data for embedded wind/solar and hydro storage.""" - df = pd.DataFrame.from_records(raw_data) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["SETTLEMENT_DATE"], x["SETTLEMENT_PERIOD"]), - axis=1, + all_production_breakdowns.append(production_breakdown) + events = ProductionBreakdownList.merge_production_breakdowns( + all_production_breakdowns, logger ) - df = df.rename(columns=ESO_FUEL_MAPPING) - return df.set_index("datetime") + return events -def process_production_events( - fuel_inst_data: pd.DataFrame, eso_data: pd.DataFrame -) -> list[dict]: - """Combine FUELINST report and ESO data together to get the full picture and to EM Format.""" - df = fuel_inst_data.join(eso_data, rsuffix="_eso") - df = df.rename(columns={"wind_eso": "wind", "solar_eso": "solar"}) - df = df.groupby(df.columns, axis=1).sum() - data_points = list() - for time_t in pd.unique(df.index): - time_df = df[df.index == time_t] - - data_point = { - "zoneKey": "GB", - "datetime": time_t.to_pydatetime(), - "source": "bmreports.com", - "production": dict(), - "storage": dict(), - } - - for row in time_df.iterrows(): - electricity_production = row[1].to_dict() - for key in electricity_production.keys(): - if key in PRODUCTION_MODES: - data_point["production"][key] = electricity_production[key] - elif key == "hydro storage": - # According to National Grid Eso: - # The demand due to pumping at hydro pump storage units; the -ve signifies pumping load. - # We store the pump loading as a positive value and discharge as negative. - data_point["storage"]["hydro"] = -electricity_production[key] +def query_production_fuelhh( + session: Session, target_datetime: datetime, logger: Logger +) -> ProductionBreakdownList: + """Fetches production data from the FUELHH endpoint. + This endpoint provides the half-hourly generation outturn (Generation By Fuel type) + to give our users an indication of the generation outturn for Great Britain. + The data is aggregated by Fuel Type category and updated at 30-minute intervals + with average MW values over 30 minutes for each category.""" + params = { + "settlementDateFrom": (target_datetime - timedelta(days=1)).strftime( + "%Y-%m-%d" + ), + "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), + "format": "json", + } - data_points.append(data_point) - return data_points + response = query_elexon(ELEXON_URLS["production_fuelhh"], session, params) + parsed_events_fuelhh = parse_production(response, logger, "FUELHH") + return parsed_events_fuelhh -def parse_production( - csv_text: str, - target_datetime: datetime | None = None, - logger: Logger = getLogger(__name__), +def query_and_merge_production_fuelhh_and_eso( + session: Session, target_datetime: datetime, logger: Logger ): - if not csv_text: - return None + events_fuelhh = query_production_fuelhh(session, target_datetime, logger) + events_eso = query_additional_eso_data(target_datetime, session, logger) - report = REPORT_META["B1620"] - - # create DataFrame from slice of CSV rows - df = pd.read_csv(StringIO(csv_text), skiprows=report["skiprows"] - 1) - - # check field count in report is as expected - field_count = len(df.columns) - breakpoint() - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in B1620 report, got {}".format( - report["expected_fields"], len(df.columns) - ) - ) - - # filter out undesired columns - df = df.iloc[:-1, [7, 8, 9, 4]] - - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(x, "%Y-%m-%d") + merged_events = ProductionBreakdownList.merge_production_breakdowns( + [events_fuelhh, events_eso], logger ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, - ) - - # map from report fuel names to electricitymap fuel names - fuel_column = "Power System Resource Type" - df[fuel_column] = df[fuel_column].apply(lambda x: RESOURCE_TYPE_TO_FUEL[x]) - breakpoint() - # loop through unique datetimes and create each data point - all_production_breakdowns = ProductionBreakdownList(logger=logger) - for time_t in pd.unique(df["datetime"]): - time_df = df[df["datetime"] == time_t] - data_point = ProductionMix() - data_point = { - "zoneKey": "GB", - "datetime": time_t.to_pydatetime(), - "source": "bmreports.com", - "production": dict(), - "storage": dict(), - } - breakpoint() - for row in time_df.iterrows(): - fields = row[1].to_dict() - fuel = fields[fuel_column] - quantity = fields["Quantity"] - - # check if storage value and if so correct key - if "storage" in fuel: - fuel_key = fuel.replace("storage", "").strip() - # ELEXON storage is negative when storing and positive when - # discharging (the opposite to electricitymap) - data_point["storage"][fuel_key] = quantity * -1 - else: - # if/else structure allows summation of multiple quantities - # e.g. 'Wind Onshore' and 'Wind Offshore' both have the - # key 'wind' here. - if fuel in data_point["production"].keys(): - data_point["production"][fuel] += quantity - else: - data_point["production"][fuel] = quantity - - all_production_breakdowns.append(zoneKey=ZoneKey("GB"), production=data_point) - - - - return all_production_breakdowns.to_list() - - -def datetime_from_date_sp(date, sp): - datetime = arrow.get(date).shift(minutes=30 * (sp - 1)) - return datetime.replace(tzinfo="Europe/London").datetime + return merged_events.to_list() -def _fetch_wind( - target_datetime: datetime | None = None, logger: Logger = getLogger(__name__) -): - if target_datetime is None: - target_datetime = datetime.now() - - # line up with B1620 (main production report) search range - d = target_datetime.date() - start = d - timedelta(hours=48) - end = datetime.combine(d + timedelta(days=1), time(0)) - - session = Session() - params = { - "FromDateTime": start.strftime("%Y-%m-%d %H:%M:%S"), - "ToDateTime": end.strftime("%Y-%m-%d %H:%M:%S"), - "ServiceType": "csv", - } - response = query_ELEXON("FUELINST", session, params) - csv_text = response.text - - NO_DATA_TXT_ANSWER = "204No Content" - if NO_DATA_TXT_ANSWER in csv_text: - logger.warning(f"Impossible to fetch wind data for {target_datetime}") - return pd.DataFrame(columns=["datetime", "Wind"]) - - report = REPORT_META["FUELINST"] - df = pd.read_csv( - StringIO(csv_text), skiprows=report["skiprows"], skipfooter=1, header=None - ) +def query_exchange( + zone_key: ZoneKey, session: Session, target_datetime: datetime, logger: Logger +) -> list: + all_exchanges: list[ExchangeList] = [] + for interconnector in ZONEKEY_TO_INTERCONNECTOR[zone_key]: + exchange_params = { + "settlementDateFrom": (target_datetime - timedelta(days=2)).strftime( + "%Y-%m-%d" + ), + "settlementDateTo": target_datetime.strftime("%Y-%m-%d"), + "interconnectorName": interconnector, + "format": "json", + } + exchange_data = query_elexon( + ELEXON_URLS["exchange"], session, exchange_params + ).get("data") - field_count = len(df.columns) - if field_count != report["expected_fields"]: - raise ValueError( - "Expected {} fields in FUELINST report, got {}".format( - report["expected_fields"], len(df.columns) + if not exchange_data: + raise ParserException( + parser="ELEXON.py", + message=f"No exchange data found for {target_datetime.date()}", + ) + for event in exchange_data: + exchange_list = ExchangeList(logger) + event_datetime = parse_datetime( + event.get("settlementDate"), event.get("settlementPeriod") ) - ) - - df = df.iloc[:, [1, 2, 3, 8]] - df.columns = ["Settlement Date", "Settlement Period", "published", "Wind"] - df["Settlement Date"] = df["Settlement Date"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d") - ) - df["Settlement Period"] = df["Settlement Period"].astype(int) - df["datetime"] = df.apply( - lambda x: datetime_from_date_sp(x["Settlement Date"], x["Settlement Period"]), - axis=1, - ) - - df["published"] = df["published"].apply( - lambda x: datetime.strptime(str(x), "%Y%m%d%H%M%S") - ) - # get the most recently published value for each datetime - idx = df.groupby("datetime")["published"].transform(max) == df["published"] - df = df[idx] - return df[["datetime", "Wind"]] + exchange_list.append( + zoneKey=zone_key, + netFlow=event.get("generation"), + source=ELEXON_SOURCE, + datetime=event_datetime, + ) + all_exchanges.append(exchange_list) + events = ExchangeList.merge_exchanges(all_exchanges, logger) + return events.to_list() @refetch_frequency(timedelta(days=1)) @@ -541,50 +348,45 @@ def fetch_exchange( logger: Logger = getLogger(__name__), ): session = session or Session() - try: - target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError: - raise ValueError(f"Invalid target_datetime: {target_datetime}") - response = query_exchange(session, target_datetime) - data = parse_exchange(zone_key1, zone_key2, response, target_datetime, logger) - return data + if target_datetime is None: + target_datetime = datetime.now(tz=timezone.utc) + + exchangeKey = ZoneKey("->".join(sorted([zone_key1, zone_key2]))) + if target_datetime < ELEXON_START_DATE: + raise ParserException( + parser="ELEXON.py", + message=f"Production data is not available before {ELEXON_START_DATE.date()}", + ) + exchange_data = query_exchange(exchangeKey, session, target_datetime, logger) + + return exchange_data -# While using the FUELINST report we can increase the refetch frequency. @refetch_frequency(timedelta(days=2)) def fetch_production( - zone_key: str = "GB", + zone_key: ZoneKey = ZoneKey("GB"), session: Session | None = None, target_datetime: datetime | None = None, logger: Logger = getLogger(__name__), ) -> list[dict]: session = session or Session() - try: - target_datetime = arrow.get(target_datetime).datetime - except arrow.parser.ParserError: - raise ValueError(f"Invalid target_datetime: {target_datetime}") - # TODO currently resorting to FUELINST as B1620 reports 0 production in most production - # modes at the moment. (16/12/2022) FUELINST will be decomissioned in 2023, so we should - # switch back to B1620 at some point. - response = query_production(session, target_datetime, "B1620") - breakpoint() - data= parse_production(response, target_datetime, logger) - - # At times B1620 has had poor quality data for wind so fetch from FUELINST - # But that source is unavailable prior to cutout date - HISTORICAL_WIND_CUTOUT = "2016-03-01" - FETCH_WIND_FROM_FUELINST = True - if target_datetime < arrow.get(HISTORICAL_WIND_CUTOUT).datetime: - FETCH_WIND_FROM_FUELINST = False - if FETCH_WIND_FROM_FUELINST: - wind = _fetch_wind(target_datetime, logger=logger) - for entry in data: - datetime = entry["datetime"] - wind_row = wind[wind["datetime"] == datetime] - if len(wind_row): - entry["production"]["wind"] = wind_row.iloc[0]["Wind"] - else: - entry["production"]["wind"] = None + if target_datetime is None: + target_datetime = datetime.now(tz=timezone.utc) + else: + target_datetime = target_datetime.astimezone(timezone.utc) + + if target_datetime < ELEXON_START_DATE: + raise ParserException( + parser="ELEXON.py", + message=f"Production data is not available before {ELEXON_START_DATE.date()}", + ) + + data = query_production(session, target_datetime, logger) + + if not len(data): + data = query_and_merge_production_fuelhh_and_eso( + session, target_datetime, logger + ) required = ["coal", "gas", "nuclear", "wind"] expected_range = { @@ -599,11 +401,4 @@ def fetch_production( for x in data if validate(x, logger, required=required, expected_range=expected_range) ] - return data - - -if __name__ == "__main__": - - data = query_exchange("FR->GB",Session(), datetime(2024, 1, 30),logger=getLogger(__name__)) - print(data[-1]) \ No newline at end of file From 1f37e65e391b8a334a46aacf664438e440b177d5 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Fri, 2 Feb 2024 13:47:29 +0100 Subject: [PATCH 03/11] other elexon update --- parsers/ELEXON.py | 63 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 68019b6559..739a2c61b0 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -39,6 +39,7 @@ ESO_NATIONAL_GRID_ENDPOINT = ( "https://api.nationalgrideso.com/api/3/action/datastore_search_sql" ) +ESO_SOURCE = "nationalgrideso.com" # A specific report to query most recent data (within 1 month time span + forecast ahead) ESO_DEMAND_DATA_UPDATE_ID = "177f6fa4-ae49-4182-81ea-0c6b35f26ca6" @@ -120,16 +121,16 @@ def query_production( """Fetches production data from the B1620 endpoint from the ELEXON API.""" production_params = { "publishDateTimeFrom": (target_datetime - timedelta(days=2)).strftime( - "%Y-%m-%d %H:%M" + "%Y-%m-%d 00:00" ), "publishDateTimeTo": target_datetime.strftime("%Y-%m-%d %H:%M"), } production_data = query_elexon( ELEXON_URLS["production"], session, production_params ) - + breakpoint() parsed_events = parse_production(production_data, logger, "B1620") - return parsed_events.to_list() + return parsed_events def parse_production( @@ -169,7 +170,7 @@ def parse_production( continue if production_mode == "hydro storage": - storage_mix.add_value("hydro", event.get(quantity_key)) + storage_mix.add_value("hydro", -1*event.get(quantity_key)) production_breakdown.append( zoneKey=ZoneKey("GB"), storage=storage_mix, @@ -209,10 +210,10 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: def query_additional_eso_data( target_datetime: datetime, session: Session, logger: Logger -) -> ProductionBreakdownList: +) -> list[dict[str, any]]: """Fetches embedded wind and solar and hydro storage data from the ESO API.""" - begin = (target_datetime - timedelta(days=1)).strftime("%Y-%m-%d") - end = (target_datetime + timedelta(days=1)).strftime("%Y-%m-%d") + begin = (target_datetime - timedelta(days=2)).strftime("%Y-%m-%d") + end = (target_datetime).strftime("%Y-%m-%d") if target_datetime > (datetime.now(tz=timezone.utc) - timedelta(days=30)): report_id = ESO_DEMAND_DATA_UPDATE_ID else: @@ -223,10 +224,7 @@ def query_additional_eso_data( } response = session.get(ESO_NATIONAL_GRID_ENDPOINT, params=params) eso_data = response.json()["result"]["records"] - - parsed_events = parse_eso_production(eso_data, logger) - - return parsed_events + return eso_data def parse_eso_production( @@ -246,7 +244,7 @@ def parse_eso_production( production_breakdown.append( zoneKey=ZoneKey("GB"), storage=storage_mix, - source=ELEXON_SOURCE, + source=ESO_SOURCE, datetime=event_datetime, ) else: @@ -256,7 +254,7 @@ def parse_eso_production( production_breakdown.append( zoneKey=ZoneKey("GB"), production=production_mix, - source=ELEXON_SOURCE, + source=ESO_SOURCE, datetime=event_datetime, ) @@ -266,10 +264,27 @@ def parse_eso_production( ) return events +def parse_eso_hydro_storage(eso_data: list[dict[str, any]], logger: Logger ): + """Parses only hydro storage data from the ESO API. This data will be merged with the B1620 data""" + storage_breakdown = ProductionBreakdownList(logger=logger) + for event in eso_data: + event_datetime = parse_datetime( + event.get("SETTLEMENT_DATE"), event.get("SETTLEMENT_PERIOD") + ) + storage_mix = StorageMix() + + storage_mix.add_value("hydro", float(event.get("PUMP_STORAGE_PUMPING"))) + storage_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) + return storage_breakdown def query_production_fuelhh( session: Session, target_datetime: datetime, logger: Logger -) -> ProductionBreakdownList: +) -> list[dict[str, any]]: """Fetches production data from the FUELHH endpoint. This endpoint provides the half-hourly generation outturn (Generation By Fuel type) to give our users an indication of the generation outturn for Great Britain. @@ -283,19 +298,21 @@ def query_production_fuelhh( "format": "json", } - response = query_elexon(ELEXON_URLS["production_fuelhh"], session, params) - parsed_events_fuelhh = parse_production(response, logger, "FUELHH") - return parsed_events_fuelhh + fuelhh_data = query_elexon(ELEXON_URLS["production_fuelhh"], session, params) + + return fuelhh_data def query_and_merge_production_fuelhh_and_eso( session: Session, target_datetime: datetime, logger: Logger ): events_fuelhh = query_production_fuelhh(session, target_datetime, logger) + parsed_events_fuelhh = parse_production(events_fuelhh, logger, "FUELHH") events_eso = query_additional_eso_data(target_datetime, session, logger) + parsed_events_eso = parse_eso_production(events_eso, logger) merged_events = ProductionBreakdownList.merge_production_breakdowns( - [events_fuelhh, events_eso], logger + [parsed_events_fuelhh, parsed_events_eso], logger ) return merged_events.to_list() @@ -382,12 +399,18 @@ def fetch_production( ) data = query_production(session, target_datetime, logger) - if not len(data): data = query_and_merge_production_fuelhh_and_eso( session, target_datetime, logger ) - + else: + # add hydro pumping data from ESO (B1620 only includes pumped storage production (injected on the grid) and not the pumping (withdrawn from the grid) + eso_data = query_additional_eso_data(target_datetime, session, logger) + parsed_hydro_storage_data = parse_eso_hydro_storage(eso_data, logger) + breakpoint() + data = ProductionBreakdownList.merge_production_breakdowns( + [data, parsed_hydro_storage_data], logger, matching_timestamps_only=True + ).to_list() required = ["coal", "gas", "nuclear", "wind"] expected_range = { # Historical data might be above the current capacity for coal From 820e2a29f90267ad760539f55705ff03f5348e08 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Wed, 21 Feb 2024 13:01:09 +0100 Subject: [PATCH 04/11] PR update --- parsers/ELEXON.py | 123 ++++++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 739a2c61b0..c20f2a0253 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -13,6 +13,7 @@ import re from datetime import datetime, timedelta, timezone from logging import Logger, getLogger +from typing import Any from requests import Response, Session @@ -24,7 +25,6 @@ from electricitymap.contrib.lib.types import ZoneKey from parsers.lib.config import refetch_frequency from parsers.lib.exceptions import ParserException -from parsers.lib.validation import validate ELEXON_API_ENDPOINT = "https://data.elexon.co.uk/bmrs/api/v1" ELEXON_URLS = { @@ -117,7 +117,7 @@ def parse_datetime(settlementDate: str, settlementPeriod: int) -> datetime: def query_production( session: Session, target_datetime: datetime, logger: Logger -) -> list: +) -> ProductionBreakdownList: """Fetches production data from the B1620 endpoint from the ELEXON API.""" production_params = { "publishDateTimeFrom": (target_datetime - timedelta(days=2)).strftime( @@ -128,13 +128,19 @@ def query_production( production_data = query_elexon( ELEXON_URLS["production"], session, production_params ) - breakpoint() + parsed_events = parse_production(production_data, logger, "B1620") return parsed_events +def get_event_value(event: dict[str, Any], key: str) -> float | None: + value = event.get(key) + if value is not None: + return float(value) + + def parse_production( - production_data: list[dict[str, any]], logger: Logger, dataset: str + production_data: list[dict[str, Any]], logger: Logger, dataset: str ) -> ProductionBreakdownList: """Parses production events from the ELEXON API. This function is used for the B1620 data or the FUELHH data.""" dataset_info = { @@ -170,21 +176,25 @@ def parse_production( continue if production_mode == "hydro storage": - storage_mix.add_value("hydro", -1*event.get(quantity_key)) - production_breakdown.append( - zoneKey=ZoneKey("GB"), - storage=storage_mix, - source=ELEXON_SOURCE, - datetime=event_datetime, - ) + storage_value = get_event_value(event, quantity_key) + if storage_value: + storage_mix.add_value("hydro", -1 * storage_value) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) else: - production_mix.add_value(production_mode, event.get(quantity_key)) - production_breakdown.append( - zoneKey=ZoneKey("GB"), - production=production_mix, - source=ELEXON_SOURCE, - datetime=event_datetime, - ) + production_value = get_event_value(event, quantity_key) + if production_value: + production_mix.add_value(production_mode, production_value) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ELEXON_SOURCE, + datetime=event_datetime, + ) all_production_breakdowns.append(production_breakdown) events = ProductionBreakdownList.merge_production_breakdowns( @@ -210,7 +220,7 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: def query_additional_eso_data( target_datetime: datetime, session: Session, logger: Logger -) -> list[dict[str, any]]: +) -> list[dict[str, Any]]: """Fetches embedded wind and solar and hydro storage data from the ESO API.""" begin = (target_datetime - timedelta(days=2)).strftime("%Y-%m-%d") end = (target_datetime).strftime("%Y-%m-%d") @@ -238,25 +248,29 @@ def parse_eso_production( ) production_mix = ProductionMix() storage_mix = StorageMix() - for production_mode in ESO_FUEL_MAPPING.keys(): + for production_mode in ESO_FUEL_MAPPING: if ESO_FUEL_MAPPING[production_mode] == "hydro storage": - storage_mix.add_value("hydro", event.get(production_mode)) - production_breakdown.append( - zoneKey=ZoneKey("GB"), - storage=storage_mix, - source=ESO_SOURCE, - datetime=event_datetime, - ) + storage_value = get_event_value(event, production_mode) + if storage_value: + storage_mix.add_value("hydro", event.get(production_mode)) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) else: - production_mix.add_value( - ESO_FUEL_MAPPING[production_mode], event.get(production_mode) - ) - production_breakdown.append( - zoneKey=ZoneKey("GB"), - production=production_mix, - source=ESO_SOURCE, - datetime=event_datetime, - ) + production_value = get_event_value(event, production_mode) + if production_value: + production_mix.add_value( + ESO_FUEL_MAPPING[production_mode], event.get(production_mode) + ) + production_breakdown.append( + zoneKey=ZoneKey("GB"), + production=production_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) all_production_breakdowns.append(production_breakdown) events = ProductionBreakdownList.merge_production_breakdowns( @@ -264,7 +278,8 @@ def parse_eso_production( ) return events -def parse_eso_hydro_storage(eso_data: list[dict[str, any]], logger: Logger ): + +def parse_eso_hydro_storage(eso_data: list[dict[str, any]], logger: Logger): """Parses only hydro storage data from the ESO API. This data will be merged with the B1620 data""" storage_breakdown = ProductionBreakdownList(logger=logger) for event in eso_data: @@ -272,19 +287,21 @@ def parse_eso_hydro_storage(eso_data: list[dict[str, any]], logger: Logger ): event.get("SETTLEMENT_DATE"), event.get("SETTLEMENT_PERIOD") ) storage_mix = StorageMix() - - storage_mix.add_value("hydro", float(event.get("PUMP_STORAGE_PUMPING"))) - storage_breakdown.append( - zoneKey=ZoneKey("GB"), - storage=storage_mix, - source=ESO_SOURCE, - datetime=event_datetime, - ) + storage_value = get_event_value(event, "PUMP_STORAGE_PUMPING") + if storage_value: + storage_mix.add_value("hydro", storage_value) + storage_breakdown.append( + zoneKey=ZoneKey("GB"), + storage=storage_mix, + source=ESO_SOURCE, + datetime=event_datetime, + ) return storage_breakdown + def query_production_fuelhh( session: Session, target_datetime: datetime, logger: Logger -) -> list[dict[str, any]]: +) -> list[dict[str, Any]]: """Fetches production data from the FUELHH endpoint. This endpoint provides the half-hourly generation outturn (Generation By Fuel type) to give our users an indication of the generation outturn for Great Britain. @@ -407,21 +424,7 @@ def fetch_production( # add hydro pumping data from ESO (B1620 only includes pumped storage production (injected on the grid) and not the pumping (withdrawn from the grid) eso_data = query_additional_eso_data(target_datetime, session, logger) parsed_hydro_storage_data = parse_eso_hydro_storage(eso_data, logger) - breakpoint() data = ProductionBreakdownList.merge_production_breakdowns( [data, parsed_hydro_storage_data], logger, matching_timestamps_only=True ).to_list() - required = ["coal", "gas", "nuclear", "wind"] - expected_range = { - # Historical data might be above the current capacity for coal - "coal": (0, 20000), - "gas": (100, 60000), - "nuclear": (100, 56000), - "wind": (0, 600000), - } - data = [ - x - for x in data - if validate(x, logger, required=required, expected_range=expected_range) - ] return data From 7ab4d30344888ef63e46eb5363c2960d65692534 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Wed, 27 Mar 2024 15:34:46 +0100 Subject: [PATCH 05/11] add validation in fetch_production --- parsers/ELEXON.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index c20f2a0253..952ccdc2fc 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -200,6 +200,7 @@ def parse_production( events = ProductionBreakdownList.merge_production_breakdowns( all_production_breakdowns, logger ) + breakpoint() return events @@ -219,7 +220,7 @@ def _create_eso_historical_demand_index(session: Session) -> dict[int, str]: def query_additional_eso_data( - target_datetime: datetime, session: Session, logger: Logger + target_datetime: datetime, session: Session ) -> list[dict[str, Any]]: """Fetches embedded wind and solar and hydro storage data from the ESO API.""" begin = (target_datetime - timedelta(days=2)).strftime("%Y-%m-%d") @@ -279,7 +280,9 @@ def parse_eso_production( return events -def parse_eso_hydro_storage(eso_data: list[dict[str, any]], logger: Logger): +def parse_eso_hydro_storage( + eso_data: list[dict[str, any]], logger: Logger +) -> ProductionBreakdownList: """Parses only hydro storage data from the ESO API. This data will be merged with the B1620 data""" storage_breakdown = ProductionBreakdownList(logger=logger) for event in eso_data: @@ -322,10 +325,10 @@ def query_production_fuelhh( def query_and_merge_production_fuelhh_and_eso( session: Session, target_datetime: datetime, logger: Logger -): +) -> list[dict[str, Any]]: events_fuelhh = query_production_fuelhh(session, target_datetime, logger) parsed_events_fuelhh = parse_production(events_fuelhh, logger, "FUELHH") - events_eso = query_additional_eso_data(target_datetime, session, logger) + events_eso = query_additional_eso_data(target_datetime, session) parsed_events_eso = parse_eso_production(events_eso, logger) merged_events = ProductionBreakdownList.merge_production_breakdowns( @@ -415,16 +418,32 @@ def fetch_production( message=f"Production data is not available before {ELEXON_START_DATE.date()}", ) - data = query_production(session, target_datetime, logger) - if not len(data): + data_b1620 = query_production(session, target_datetime, logger) + + if not validate_bmrs_data(data_b1620): data = query_and_merge_production_fuelhh_and_eso( session, target_datetime, logger ) else: # add hydro pumping data from ESO (B1620 only includes pumped storage production (injected on the grid) and not the pumping (withdrawn from the grid) - eso_data = query_additional_eso_data(target_datetime, session, logger) + eso_data = query_additional_eso_data(target_datetime, session) + breakpoint() parsed_hydro_storage_data = parse_eso_hydro_storage(eso_data, logger) data = ProductionBreakdownList.merge_production_breakdowns( - [data, parsed_hydro_storage_data], logger, matching_timestamps_only=True + [data_b1620, parsed_hydro_storage_data], + logger, + matching_timestamps_only=True, ).to_list() return data + + +def validate_bmrs_data(data: ProductionBreakdownList): + """Check if the PowerProductionBreakdown event contains a full power breakdown or just wind and solar or if data is missing.""" + if not data: + return False + available_production_modes = [] + for event in data.to_list(): + available_production_modes += [*event["production"].keys()] + if "gas" not in set(available_production_modes): + return False + return True From 71166547846bed481861eebb4f8ed3e895fbc5f1 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Wed, 27 Mar 2024 15:48:02 +0100 Subject: [PATCH 06/11] update parse_datetime --- parsers/ELEXON.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 952ccdc2fc..7a80574e70 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -109,7 +109,14 @@ def query_elexon(url: str, session: Session, params: dict) -> list: return r.json() -def parse_datetime(settlementDate: str, settlementPeriod: int) -> datetime: +def parse_datetime( + settlementDate: str | None, settlementPeriod: int | None +) -> datetime: + if settlementDate is None or settlementPeriod is None: + raise ParserException( + parser="ELEXON", + message="Error fetching data: settlementDate or settlementPeriod is None", + ) parsed_datetime = datetime.strptime(settlementDate, "%Y-%m-%d") parsed_datetime += timedelta(hours=(settlementPeriod - 1) / 2) return parsed_datetime.replace(tzinfo=timezone.utc) From 1c36db7873cd3ed3f2c7b18ca5ad031ee417f276 Mon Sep 17 00:00:00 2001 From: Mathilde Daugy Date: Wed, 27 Mar 2024 15:51:45 +0100 Subject: [PATCH 07/11] remove breakpojnt --- parsers/ELEXON.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 7a80574e70..c00210c676 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -207,7 +207,6 @@ def parse_production( events = ProductionBreakdownList.merge_production_breakdowns( all_production_breakdowns, logger ) - breakpoint() return events @@ -434,7 +433,6 @@ def fetch_production( else: # add hydro pumping data from ESO (B1620 only includes pumped storage production (injected on the grid) and not the pumping (withdrawn from the grid) eso_data = query_additional_eso_data(target_datetime, session) - breakpoint() parsed_hydro_storage_data = parse_eso_hydro_storage(eso_data, logger) data = ProductionBreakdownList.merge_production_breakdowns( [data_b1620, parsed_hydro_storage_data], From 127efa3a8abe21ee9b9c9ac1c4df147986eae213 Mon Sep 17 00:00:00 2001 From: Viktor Andersson <30777521+VIKTORVAV99@users.noreply.github.com> Date: Wed, 22 May 2024 19:58:32 +0200 Subject: [PATCH 08/11] fix any types --- parsers/ELEXON.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index c00210c676..3564515cbe 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -245,7 +245,7 @@ def query_additional_eso_data( def parse_eso_production( - production_data: list[dict[str, any]], logger: Logger + production_data: list[dict[str, Any]], logger: Logger ) -> ProductionBreakdownList: all_production_breakdowns: list[ProductionBreakdownList] = [] for event in production_data: @@ -287,7 +287,7 @@ def parse_eso_production( def parse_eso_hydro_storage( - eso_data: list[dict[str, any]], logger: Logger + eso_data: list[dict[str, Any]], logger: Logger ) -> ProductionBreakdownList: """Parses only hydro storage data from the ESO API. This data will be merged with the B1620 data""" storage_breakdown = ProductionBreakdownList(logger=logger) From ceb40c16446d618ce172fc7866919f26783e17ae Mon Sep 17 00:00:00 2001 From: Viktor Andersson <30777521+VIKTORVAV99@users.noreply.github.com> Date: Wed, 22 May 2024 20:04:24 +0200 Subject: [PATCH 09/11] correct negative with 0 --- parsers/ELEXON.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 3564515cbe..3ade3cbbf9 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -195,7 +195,9 @@ def parse_production( else: production_value = get_event_value(event, quantity_key) if production_value: - production_mix.add_value(production_mode, production_value) + production_mix.add_value( + production_mode, production_value, correct_negative_with_zero=True + ) production_breakdown.append( zoneKey=ZoneKey("GB"), production=production_mix, @@ -270,7 +272,9 @@ def parse_eso_production( production_value = get_event_value(event, production_mode) if production_value: production_mix.add_value( - ESO_FUEL_MAPPING[production_mode], event.get(production_mode) + ESO_FUEL_MAPPING[production_mode], + event.get(production_mode), + correct_negative_with_zero=True, ) production_breakdown.append( zoneKey=ZoneKey("GB"), From ac22fd1b7c0192f2e2f33a02ac46301b307c5cc0 Mon Sep 17 00:00:00 2001 From: Viktor Andersson <30777521+VIKTORVAV99@users.noreply.github.com> Date: Wed, 22 May 2024 20:06:40 +0200 Subject: [PATCH 10/11] only merge matching timestamps --- parsers/ELEXON.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index 3ade3cbbf9..b24c77826e 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -342,7 +342,7 @@ def query_and_merge_production_fuelhh_and_eso( parsed_events_eso = parse_eso_production(events_eso, logger) merged_events = ProductionBreakdownList.merge_production_breakdowns( - [parsed_events_fuelhh, parsed_events_eso], logger + [parsed_events_fuelhh, parsed_events_eso], logger, matching_timestamps_only=True ) return merged_events.to_list() From e20d3954408db0f6009d8018ef994ab1156bc0c0 Mon Sep 17 00:00:00 2001 From: Viktor Andersson <30777521+VIKTORVAV99@users.noreply.github.com> Date: Wed, 22 May 2024 20:17:32 +0200 Subject: [PATCH 11/11] call to_string() in last function --- parsers/ELEXON.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/parsers/ELEXON.py b/parsers/ELEXON.py index b24c77826e..a085d1b0fd 100644 --- a/parsers/ELEXON.py +++ b/parsers/ELEXON.py @@ -329,13 +329,12 @@ def query_production_fuelhh( } fuelhh_data = query_elexon(ELEXON_URLS["production_fuelhh"], session, params) - return fuelhh_data def query_and_merge_production_fuelhh_and_eso( session: Session, target_datetime: datetime, logger: Logger -) -> list[dict[str, Any]]: +) -> ProductionBreakdownList: events_fuelhh = query_production_fuelhh(session, target_datetime, logger) parsed_events_fuelhh = parse_production(events_fuelhh, logger, "FUELHH") events_eso = query_additional_eso_data(target_datetime, session) @@ -344,12 +343,12 @@ def query_and_merge_production_fuelhh_and_eso( merged_events = ProductionBreakdownList.merge_production_breakdowns( [parsed_events_fuelhh, parsed_events_eso], logger, matching_timestamps_only=True ) - return merged_events.to_list() + return merged_events def query_exchange( zone_key: ZoneKey, session: Session, target_datetime: datetime, logger: Logger -) -> list: +) -> ExchangeList: all_exchanges: list[ExchangeList] = [] for interconnector in ZONEKEY_TO_INTERCONNECTOR[zone_key]: exchange_params = { @@ -382,8 +381,7 @@ def query_exchange( datetime=event_datetime, ) all_exchanges.append(exchange_list) - events = ExchangeList.merge_exchanges(all_exchanges, logger) - return events.to_list() + return ExchangeList.merge_exchanges(all_exchanges, logger) @refetch_frequency(timedelta(days=1)) @@ -406,7 +404,7 @@ def fetch_exchange( ) exchange_data = query_exchange(exchangeKey, session, target_datetime, logger) - return exchange_data + return exchange_data.to_list() @refetch_frequency(timedelta(days=2)) @@ -442,8 +440,8 @@ def fetch_production( [data_b1620, parsed_hydro_storage_data], logger, matching_timestamps_only=True, - ).to_list() - return data + ) + return data.to_list() def validate_bmrs_data(data: ProductionBreakdownList):