From 48e3c3a6ee642ff1c7bf22c9cc18049cecaa1d5a Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Thu, 15 Aug 2024 15:45:19 +0100 Subject: [PATCH] Updates 2024-08-15 - Refactoring and finished London Datastore asset --- .../catalogue_metadata_assets/__init__.py | 0 .../analytics_platform_datastore_assets.py | 90 +++++++++++++++++-- .../assets/energy_data_assets/__init__.py | 0 .../entose_uk_electricity.py | 17 ++++ .../environment_data_assets/__init__.py | 0 .../infrastructure_data_assets/__init__.py | 0 .../assets/location_data_assets/__init__.py | 0 .../assets/trade_data_assets/__init__.py | 0 .../jobs/analytics_platfom_jobs.py | 5 +- .../london_datastore.py | 35 ++++---- .../utils/pydantic_model_helper/__init__.py | 0 .../utils/variables_helper/__init__.py | 0 12 files changed, 122 insertions(+), 25 deletions(-) create mode 100644 analytics_platform_dagster/assets/catalogue_metadata_assets/__init__.py create mode 100644 analytics_platform_dagster/assets/energy_data_assets/__init__.py create mode 100644 analytics_platform_dagster/assets/energy_data_assets/entose_uk_electricity.py create mode 100644 analytics_platform_dagster/assets/environment_data_assets/__init__.py create mode 100644 analytics_platform_dagster/assets/infrastructure_data_assets/__init__.py create mode 100644 analytics_platform_dagster/assets/location_data_assets/__init__.py create mode 100644 analytics_platform_dagster/assets/trade_data_assets/__init__.py create mode 100644 analytics_platform_dagster/utils/pydantic_model_helper/__init__.py create mode 100644 analytics_platform_dagster/utils/variables_helper/__init__.py diff --git a/analytics_platform_dagster/assets/catalogue_metadata_assets/__init__.py b/analytics_platform_dagster/assets/catalogue_metadata_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/assets/catalogue_metadata_assets/analytics_platform_datastore_assets.py b/analytics_platform_dagster/assets/catalogue_metadata_assets/analytics_platform_datastore_assets.py index 5a2e415..362ec96 100644 --- a/analytics_platform_dagster/assets/catalogue_metadata_assets/analytics_platform_datastore_assets.py +++ b/analytics_platform_dagster/assets/catalogue_metadata_assets/analytics_platform_datastore_assets.py @@ -1,19 +1,91 @@ import requests +import json import pandas as pd -from ...utils.variables_helper.url_links import asset_urls -from pprint import pprint +from dagster import asset, AssetIn, AssetExecutionContext +from ...models.catalogue_metadata_models.london_datastore import ( + LondonDatastoreCatalogue, +) -def london_datastore_bronze() -> pd.DataFrame: +@asset(group_name="metadata_catalogues", io_manager_key="S3Json") +def london_datastore_bronze(context: AssetExecutionContext): """ London Datastore Metadata bronze bucket """ - url = asset_urls.get("london_data_store") - response = requests.get(url) - response.raise_for_status() - data = response.json() - pprint(data) - df = pd.DataFrame(data) + try: + url = "https://data.london.gov.uk/api/datasets/export.json" + response = requests.get(url) + response.raise_for_status() + data = json.loads(response.content) + context.log.info(f"There were: {len(data)} catalogue items.") + return data + + except Exception as e: + raise e + + +@asset( + group_name="metadata_catalogues", + io_manager_key="DeltaLake", + metadata={"mode": "overwrite"}, + ins={"london_datastore_bronze": AssetIn("london_datastore_bronze")}, +) +def london_datastore_silver(context: AssetExecutionContext, london_datastore_bronze): + """ + Process London Datastore Metadata into silver bucket. + """ + + # Make sure bronze bucket data can be read in + input_data = london_datastore_bronze + + # Validate the data using the Pydantic model + validated = LondonDatastoreCatalogue.model_validate({"items": input_data}) + + # list to store data pre dataframe + rows = [] + + for item in validated.items: + base_data = { + "id": item.id, + "title": item.title, + "description": item.description, + "author": item.author, + "author_email": item.author_email, + "maintainer": item.maintainer, + "maintainer_email": item.maintainer_email, + "licence": item.licence, + "licence_notes": item.licence_notes, + "update_frequency": item.update_frequency, + "slug": item.slug, + "state": item.state, + "createdAt": item.createdAt, + "updatedAt": item.updatedAt, + "london_smallest_geography": item.london_smallest_geography, + "tags": ", ".join(item.tags) if item.tags else "", + "topics": ", ".join(item.topics) if item.topics else "", + "shares": str(item.shares), + } + + for resource_id, resource in item.resources.items(): + resource_data = { + "resource_id": resource_id, + "resource_title": resource.title, + "resource_format": resource.format, + "resource_url": resource.url, + "resource_description": resource.description, + "resource_check_hash": resource.check_hash, + "resource_check_size": resource.check_size, + "resource_check_timestamp": resource.check_timestamp, + } + rows.append({**base_data, **resource_data}) + + df = pd.DataFrame(rows) + + context.log.info(f"Overview: {df.head(25)}") + context.log.info(f"Overview 2: {df.columns}") + context.log.info(f"Overview 2: {df.dtypes}") + context.log.info(f"Overview 2: {df.shape}") + df = df.astype(str) return df diff --git a/analytics_platform_dagster/assets/energy_data_assets/__init__.py b/analytics_platform_dagster/assets/energy_data_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/assets/energy_data_assets/entose_uk_electricity.py b/analytics_platform_dagster/assets/energy_data_assets/entose_uk_electricity.py new file mode 100644 index 0000000..b19ab5f --- /dev/null +++ b/analytics_platform_dagster/assets/energy_data_assets/entose_uk_electricity.py @@ -0,0 +1,17 @@ +# WORK IN PROGRESS + +# from entsoe import EntsoePandasClient +# import pandas as pd + + +# def test_response(): +# client = EntsoePandasClient(api_key="5f5dc606-7180-49b2-833f-040c3bc23c81") +# country_code = "10Y1001A1001A92E" +# start = pd.Timestamp("20240115", tz="Europe/London") +# end = pd.Timestamp("20240316", tz="Europe/London") +# return client.query_generation_per_plant( +# country_code, start=start, end=end, psr_type=None, include_eic=False +# ) + +# v = test_response() +# print(v) diff --git a/analytics_platform_dagster/assets/environment_data_assets/__init__.py b/analytics_platform_dagster/assets/environment_data_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/assets/infrastructure_data_assets/__init__.py b/analytics_platform_dagster/assets/infrastructure_data_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/assets/location_data_assets/__init__.py b/analytics_platform_dagster/assets/location_data_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/assets/trade_data_assets/__init__.py b/analytics_platform_dagster/assets/trade_data_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/jobs/analytics_platfom_jobs.py b/analytics_platform_dagster/jobs/analytics_platfom_jobs.py index 72d9f39..2201245 100644 --- a/analytics_platform_dagster/jobs/analytics_platfom_jobs.py +++ b/analytics_platform_dagster/jobs/analytics_platfom_jobs.py @@ -19,7 +19,10 @@ ) # Metadata -metadata_job_1 = define_asset_job(name="metadata_job_1", selection=["london_datastore"]) +metadata_job_1 = define_asset_job( + name="metadata_job_1", + selection=["london_datastore_bronze", "london_datastore_silver"], +) # Energy energy_job_1 = define_asset_job( diff --git a/analytics_platform_dagster/models/catalogue_metadata_models/london_datastore.py b/analytics_platform_dagster/models/catalogue_metadata_models/london_datastore.py index 675c5ff..8eeb0b2 100644 --- a/analytics_platform_dagster/models/catalogue_metadata_models/london_datastore.py +++ b/analytics_platform_dagster/models/catalogue_metadata_models/london_datastore.py @@ -1,26 +1,29 @@ from pydantic import BaseModel, Field -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any from datetime import datetime class Resource(BaseModel): + format: str + title: str + url: str check_hash: Optional[str] = None check_http_status: Optional[int] = None check_mimetype: Optional[str] = None check_size: Optional[int] = None check_timestamp: Optional[datetime] = None - format: Optional[str] = None - london_res_geo: List[str] = Field(default_factory=list) + description: Optional[str] = None + london_release_date: Optional[str] = None + london_res_geo: Optional[List[str]] = None order: Optional[int] = None - resource_type: Optional[str] = None searchDescription: Optional[str] = None searchFilename: Optional[str] = None searchTitle: Optional[str] = None - title: Optional[str] = None - url: Optional[str] = None + temporal_coverage_from: Optional[str] = None + temporal_coverage_to: Optional[str] = None -class Dataset(BaseModel): +class DatasetItem(BaseModel): author: Optional[str] = None author_email: Optional[str] = None createdAt: Optional[datetime] = None @@ -28,21 +31,23 @@ class Dataset(BaseModel): id: str licence: Optional[str] = None licence_notes: Optional[str] = None - london_smallest_geography: Optional[str] = None maintainer: Optional[str] = None maintainer_email: Optional[str] = None - parent: Optional[str] = None - resources: Dict[str, Resource] = Field(default_factory=dict) - shares: Dict[str, Dict] = Field(default_factory=lambda: {"orgs": {}, "users": {}}) + resources: Dict[str, Resource] + shares: Optional[Dict[str, Any]] = None sharing: Optional[str] = None slug: str state: Optional[str] = None - tags: List[str] = Field(default_factory=list) + tags: Optional[List[str]] = None title: str - topics: List[str] = Field(default_factory=list) + topics: Optional[List[str]] = None update_frequency: Optional[str] = None updatedAt: Optional[datetime] = None + parent: Optional[str] = None + london_smallest_geography: Optional[str] = None + london_bounding_box: Optional[str] = None + odi_certificate: Optional[str] = Field(None, alias="odi-certificate") -class DataCollection(BaseModel): - datasets: List[Dataset] +class LondonDatastoreCatalogue(BaseModel): + items: List[DatasetItem] diff --git a/analytics_platform_dagster/utils/pydantic_model_helper/__init__.py b/analytics_platform_dagster/utils/pydantic_model_helper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics_platform_dagster/utils/variables_helper/__init__.py b/analytics_platform_dagster/utils/variables_helper/__init__.py new file mode 100644 index 0000000..e69de29