diff --git a/analytics_platform_dagster/__init__.py b/analytics_platform_dagster/__init__.py index 43b4b08..85e8a66 100644 --- a/analytics_platform_dagster/__init__.py +++ b/analytics_platform_dagster/__init__.py @@ -12,14 +12,16 @@ from .assets.energy_data_assets import ( analytics_platform_carbon_intensity_assets, entsog_uk_gas_assets, - ukpn_smart_metres + ukpn_smart_metres, + renewable_energy_planning ) from .assets.catalogue_metadata_assets import london_datastore, ukpn_datastore_roadmap from .assets.infrastructure_data_assets import ( national_charge_points_london_assets, uk_power_networks_live_faults, - national_charge_points_uk_assets + national_charge_points_uk_assets, + uk_heat_networks ) from .assets.location_data_assets import built_up_areas @@ -90,7 +92,9 @@ def get_env_var(var_name: str) -> str: national_charge_points_uk_assets, green_belt, built_up_areas, - ukpn_smart_metres + ukpn_smart_metres, + renewable_energy_planning, + uk_heat_networks ] ), jobs=[ diff --git a/analytics_platform_dagster/assets/energy_data_assets/renewable_energy_planning.py b/analytics_platform_dagster/assets/energy_data_assets/renewable_energy_planning.py new file mode 100644 index 0000000..e1a2a23 --- /dev/null +++ b/analytics_platform_dagster/assets/energy_data_assets/renewable_energy_planning.py @@ -0,0 +1,19 @@ +from typing import Optional +from dagster import asset, AssetExecutionContext + +from ...utils.variables_helper.url_links import asset_urls +from ...utils.etl_patterns.bronze_factory import bronze_asset_factory + +@asset( + name="renewable_energy_planning", + group_name="energy_assets", + io_manager_key="S3Parquet" +) +@bronze_asset_factory( + url_key="renewable_energy_planning", + asset_urls=asset_urls, + sheet_name="REPD" +) +def renewable_energy_planning_bronze(context: AssetExecutionContext) -> Optional[bytes]: + """Load Renewable Energy Planing Dataset as Parquet to bronze bucket""" + return None diff --git a/analytics_platform_dagster/assets/infrastructure_data_assets/uk_heat_networks.py b/analytics_platform_dagster/assets/infrastructure_data_assets/uk_heat_networks.py new file mode 100644 index 0000000..98e0af1 --- /dev/null +++ b/analytics_platform_dagster/assets/infrastructure_data_assets/uk_heat_networks.py @@ -0,0 +1,19 @@ +from typing import Optional +from dagster import asset, AssetExecutionContext + +from ...utils.variables_helper.url_links import asset_urls +from ...utils.etl_patterns.bronze_factory import bronze_asset_factory + +@asset( + name="uk_heat_networks_bronze", + group_name="infrastructure_assets", + io_manager_key="S3Parquet" +) +@bronze_asset_factory( + url_key="uk_heat_networks", + asset_urls=asset_urls, + sheet_name="Heat Networks" +) +def uk_heat_networks_bronze(context: AssetExecutionContext) -> Optional[bytes]: + """Load UK Heat Network Dataset as Parquet to bronze bucket""" + return None diff --git a/analytics_platform_dagster/utils/etl_patterns/bronze_factory.py b/analytics_platform_dagster/utils/etl_patterns/bronze_factory.py index 40b2b35..a168640 100644 --- a/analytics_platform_dagster/utils/etl_patterns/bronze_factory.py +++ b/analytics_platform_dagster/utils/etl_patterns/bronze_factory.py @@ -1,27 +1,67 @@ +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union import polars as pl import requests import io -from typing import Any, Callable, Dict, List, Optional, TypeVar from pydantic import BaseModel, ValidationError from dagster import AssetExecutionContext from functools import wraps from builtins import bytes +from pathlib import Path T = TypeVar('T', bound=BaseModel) class BronzeETLBase: - """Base class for Bronze ETL operations""" + """Base class for Bronze ETL operations supporting both API and Excel data sources using Polars""" + def __init__(self, url_key: str, asset_urls: Dict[str, str]): self.url_key = url_key self.asset_urls = asset_urls - def fetch_api_data(self) -> Dict: + def _is_excel_url(self, url: str) -> bool: + """Check if URL points to an Excel file""" + return any(url.lower().endswith(ext) for ext in ['.xlsx', '.xls', '.xlsm']) + + def _process_excel_response(self, response: bytes, sheet_name: Optional[str] = None) -> Dict: + """Process Excel file from response bytes using Polars""" + try: + # Create a BytesIO object from the response content + excel_buffer = io.BytesIO(response) + + # Read Excel file into Polars DataFrame + if sheet_name: + df = pl.read_excel(excel_buffer, sheet_name=sheet_name) + else: + df = pl.read_excel(excel_buffer) + + # Convert to records format similar to API response + records = df.to_dicts() + + # Wrap in a dict with 'items' key to match API structure if needed + return {'items': records} + + except Exception as e: + raise ValueError(f"Error processing Excel file: {str(e)}") + + def fetch_data(self, sheet_name: Optional[str] = None) -> Dict: + """Fetch data from either API or Excel source + + Args: + sheet_name: Optional name of Excel sheet to read. If None, reads first sheet. + + Returns: + Dict containing the data, with consistent structure regardless of source + """ url = self.asset_urls.get(self.url_key) if url is None: raise ValueError(f"URL for {self.url_key} not found in asset_urls") + response = requests.get(url) response.raise_for_status() - return response.json() + + if self._is_excel_url(url): + return self._process_excel_response(response.content, sheet_name) + else: + return response.json() def validate_data( self, @@ -31,7 +71,6 @@ def validate_data( ) -> List[Dict[str, Any]]: if model is None: return [] - try: if wrap_items: model.model_validate({"items": data}) @@ -53,8 +92,9 @@ def bronze_asset_factory( model: Optional[type[T]] = None, transform_func: Optional[Callable] = None, wrap_items: bool = False, + sheet_name: Optional[str] = None, ): - """Factory function for creating bronze assets + """Factory function for creating bronze assets with Excel support using Polars Args: url_key: Key to lookup URL in asset_urls @@ -62,14 +102,15 @@ def bronze_asset_factory( model: Optional Pydantic model for validation transform_func: Optional function to transform data wrap_items: Whether to wrap items for validation + sheet_name: Optional name of Excel sheet to read """ def decorator(func): @wraps(func) def wrapper(context: AssetExecutionContext) -> bytes: etl = BronzeETLBase(url_key, asset_urls) try: - # Fetch data - data = etl.fetch_api_data() + # Fetch data (now supports Excel with Polars) + data = etl.fetch_data(sheet_name=sheet_name) # Validate data only if model is provided validation_errors = etl.validate_data(data, model, wrap_items) if model else [] @@ -86,7 +127,7 @@ def wrapper(context: AssetExecutionContext) -> bytes: else: transformed_data = data - # Create DataFrame + # Create DataFrame using Polars df = pl.DataFrame(transformed_data) # Log some info about the data diff --git a/analytics_platform_dagster/utils/variables_helper/url_links.py b/analytics_platform_dagster/utils/variables_helper/url_links.py index 6889fb9..329e742 100644 --- a/analytics_platform_dagster/utils/variables_helper/url_links.py +++ b/analytics_platform_dagster/utils/variables_helper/url_links.py @@ -11,5 +11,7 @@ "entsog_gas": "https://transparency.entsog.eu/api/v1/operationalData.json", "os_built_up_areas": "https://api.os.uk/downloads/v1/products/BuiltUpAreas/downloads?area=GB&format=GeoPackage&redirect", "ukpn_datastore_roadmap_bronze": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-external-facing-tracker/exports/json?lang=en&timezone=Europe%2FLondon", - "ukpn_smart_metres": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-smart-meter-installation-volumes/exports/json?lang=en&timezone=Europe%2FLondon" + "ukpn_smart_metres": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-smart-meter-installation-volumes/exports/json?lang=en&timezone=Europe%2FLondon", + "renewable_energy_planning": "https://assets.publishing.service.gov.uk/media/66fbf70430536cb927482a3d/repd-q2-july-2024.xlsx", + "uk_heat_networks": "https://assets.publishing.service.gov.uk/media/66bb52a10808eaf43b50e0f0/HNPD_2024_Q2.xlsx" } diff --git a/images/Dockerfile.dagster-daemon b/images/Dockerfile.dagster-daemon index caed205..e2c2125 100644 --- a/images/Dockerfile.dagster-daemon +++ b/images/Dockerfile.dagster-daemon @@ -34,7 +34,8 @@ RUN pip install --no-cache-dir \ shapely \ aiohttp \ openpyxl \ - polars + polars \ + fastexcel # Expose the Dagster daemon port EXPOSE 3000 diff --git a/images/Dockerfile.dagster-webserver b/images/Dockerfile.dagster-webserver index 68fa2b7..83613bd 100644 --- a/images/Dockerfile.dagster-webserver +++ b/images/Dockerfile.dagster-webserver @@ -34,7 +34,8 @@ RUN pip install --no-cache-dir \ shapely \ aiohttp \ openpyxl \ - polars + polars \ + fastexcel # Expose the Dagster webserver port EXPOSE 3000 diff --git a/pyproject.toml b/pyproject.toml index 40ee69d..086c41a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ streamlit = "^1.38.0" ruff = "^0.6.7" openpyxl = "^3.1.5" polars = "^1.12.0" +fastexcel = "^0.12.0" [build-system]