From 173f637547082f1042ba4e3fedf80fe795e15f9d Mon Sep 17 00:00:00 2001 From: christophercarlon Date: Mon, 29 Jul 2024 00:03:21 +0100 Subject: [PATCH] Updates 2024-07-29 - Added in National Charge Point asset bronze data --- analytics_platform_dagster/__init__.py | 17 ++++-- ..._platform_national_charge_points_assets.py | 58 +++++++------------ .../jobs/analytics_platfom_jobs.py | 14 +++-- .../utils/requests_helper.py | 17 +++++- analytics_platform_dagster/utils/url_links.py | 3 +- 5 files changed, 61 insertions(+), 48 deletions(-) diff --git a/analytics_platform_dagster/__init__.py b/analytics_platform_dagster/__init__.py index af7d2aa..cfe3887 100644 --- a/analytics_platform_dagster/__init__.py +++ b/analytics_platform_dagster/__init__.py @@ -1,17 +1,21 @@ from dagster import Definitions, load_assets_from_modules + from .assets.location_data_assets import analytics_platform_os_assets from .assets.environment_data_assets import analytics_platform_ea_assets from .assets.trade_data_assets import analytics_platform_dbt_assets from .assets.energy_data_assets import analytics_platform_carbon_intensity_assets from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets -from .utils.io_manager import S3ParquetManager, AwsWranglerDeltaLakeIOManager +from .assets.infrastructure_data_assets import analytics_platform_national_charge_points_assets + +from .utils.io_manager import S3ParquetManager, AwsWranglerDeltaLakeIOManager, S3JSONManager from .jobs.analytics_platfom_jobs import ( environment_job_1, trade_job_1, metadata_job_1, energy_job_1, - energy_job_1_daily + energy_job_1_daily, + infrastructure_job_1 ) defs = Definitions( @@ -20,7 +24,8 @@ analytics_platform_os_assets, analytics_platform_ea_assets, analytics_platform_dbt_assets, - analytics_platform_carbon_intensity_assets + analytics_platform_carbon_intensity_assets, + analytics_platform_national_charge_points_assets ]), jobs=[ @@ -28,10 +33,14 @@ trade_job_1, metadata_job_1, energy_job_1, + infrastructure_job_1, ], - schedules=[energy_job_1_daily], + schedules=[ + energy_job_1_daily + ], resources={ "S3Parquet": S3ParquetManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"), + "S3Json": S3JSONManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"), "DeltaLake": AwsWranglerDeltaLakeIOManager(bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe") }, ) diff --git a/analytics_platform_dagster/assets/infrastructure_data_assets/analytics_platform_national_charge_points_assets.py b/analytics_platform_dagster/assets/infrastructure_data_assets/analytics_platform_national_charge_points_assets.py index 616d49c..87a523e 100644 --- a/analytics_platform_dagster/assets/infrastructure_data_assets/analytics_platform_national_charge_points_assets.py +++ b/analytics_platform_dagster/assets/infrastructure_data_assets/analytics_platform_national_charge_points_assets.py @@ -1,41 +1,25 @@ -# from ...utils.requests_helper import return_api_data_json -# from ...utils.io_manager import S3JSONManager -# from ...models.infrastructure_data_models.national_charge_point_model import ChargeDeviceResponse -# from datetime import datetime -# from dagster import asset, AssetExecutionContext, AssetIn - -# API_ENDPOINT = "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/records?limit=50" -# def fetch_national_charge_point_data() -> ChargeDeviceResponse: -# url = API_ENDPOINT -# data = return_api_data_json(url) -# return ChargeDeviceResponse.model_validate(data, strict=True) +from typing import Any +from ...utils.requests_helper import stream_json +from ...utils.url_links import asset_urls +# from ...models.infrastructure_data_models.national_charge_point_model import ChargeDeviceResponse +from dagster import AssetExecutionContext, asset -# @asset( -# group_name="energy_assets", -# io_manager_key="S3Parquet" -# ) -# def national_charge_point_data_bronze(): -# response = fetch_national_charge_point_data() -# return response +DONWLOADLINK = asset_urls.get("national_charge_points") +def fetch_national_charge_point_data() -> Any: + try: + url = DONWLOADLINK + return stream_json(url, set_chunk=5000) + except Exception as error: + raise error -# def national_charge_point_data_silver(): -# # Fetch the data -# response = fetch_national_charge_point_data() - -# # Extract the results -# charge_devices = response.results - -# # Convert to a list of dictionaries -# data = [device.model_dump() for device in charge_devices] - -# # Create a DataFrame -# df = pd.DataFrame(data) - -# # Handle the nested geo_point structure -# df['lon'] = df['geo_point'].apply(lambda x: x['lon']) -# df['lat'] = df['geo_point'].apply(lambda x: x['lat']) -# df = df.drop('geo_point', axis=1) - -# return df \ No newline at end of file +@asset( + group_name="infrastructure_assets", + io_manager_key="S3Json" + ) +def national_charge_point_data_bronze(context: AssetExecutionContext) -> Any: + context.log.info("Started Processing Data") + response = fetch_national_charge_point_data() + context.log.info("Finished Processing Data") + return response diff --git a/analytics_platform_dagster/jobs/analytics_platfom_jobs.py b/analytics_platform_dagster/jobs/analytics_platfom_jobs.py index 04aa9c2..06be6fd 100644 --- a/analytics_platform_dagster/jobs/analytics_platfom_jobs.py +++ b/analytics_platform_dagster/jobs/analytics_platfom_jobs.py @@ -2,25 +2,25 @@ # Evironment environment_job_1 = define_asset_job( - name="analytics_platform_ea_assets", + name="environment_job_1", selection=["ea_floods", "ea_flood_areas"] ) # Trade trade_job_1 = define_asset_job( - name="analytics_platform_dbt_assets", + name="trade_job_1", selection=["dbt_trade_barriers", "dbt_trade_barriers_transform", "dbt_trade_barriers_delta_lake"] ) # Metadata metadata_job_1 = define_asset_job( - name="analytics_platform_datastore_assets", + name="metadata_job_1", selection=["london_datastore"] ) # Energy energy_job_1 = define_asset_job( - name="analytics_platform_carbon_intensity_assets", + name="energy_job_1", selection=["carbon_intensity_bronze", "carbon_intensity_silver"] ) @@ -30,3 +30,9 @@ execution_timezone="Europe/London", name="energy_daily_schedule" ) + +# Infrastructure +infrastructure_job_1 = define_asset_job( + name="infrastructure_job_1", + selection=["national_charge_point_data_bronze"] +) \ No newline at end of file diff --git a/analytics_platform_dagster/utils/requests_helper.py b/analytics_platform_dagster/utils/requests_helper.py index de1e127..4ba627a 100644 --- a/analytics_platform_dagster/utils/requests_helper.py +++ b/analytics_platform_dagster/utils/requests_helper.py @@ -1,6 +1,7 @@ import requests +import json -def return_api_data_json(url_link: str): +def return_json(url_link: str) -> json: try: response = requests.get(url_link) response.raise_for_status() @@ -8,4 +9,16 @@ def return_api_data_json(url_link: str): return data except requests.RequestException as error: print(f"An error occurred: {error}") - raise \ No newline at end of file + raise + + +def stream_json(url: str, set_chunk: int) -> json: + try: + response = requests.get(url, stream=True) + buffer = '' + for chunk in response.iter_content(set_chunk): + buffer += chunk.decode('utf-8') + return json.loads(buffer) + except requests.RequestException as error: + print(f"An error occurred: {error}") + raise diff --git a/analytics_platform_dagster/utils/url_links.py b/analytics_platform_dagster/utils/url_links.py index 800da3a..77b56ef 100644 --- a/analytics_platform_dagster/utils/url_links.py +++ b/analytics_platform_dagster/utils/url_links.py @@ -2,5 +2,6 @@ "dbt_asset":"https://data.api.trade.gov.uk/v1/datasets/market-barriers/versions/latest/data?format=json", "ea_flood_areas":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999", "ea_floods":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999", - "london_data_store":"https://data.london.gov.uk/api/datasets/export.json" + "london_data_store":"https://data.london.gov.uk/api/datasets/export.json", + "national_charge_points": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/exports/json?lang=en&timezone=Europe%2FLondon" }