Skip to content

Commit

Permalink
Updates 2024-08-15 - Refactoring and finished London Datastore asset
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 15, 2024
1 parent 6c492ae commit 48e3c3a
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 25 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -1,19 +1,91 @@
import requests
import json
import pandas as pd

from ...utils.variables_helper.url_links import asset_urls
from pprint import pprint
from dagster import asset, AssetIn, AssetExecutionContext
from ...models.catalogue_metadata_models.london_datastore import (
LondonDatastoreCatalogue,
)


def london_datastore_bronze() -> pd.DataFrame:
@asset(group_name="metadata_catalogues", io_manager_key="S3Json")
def london_datastore_bronze(context: AssetExecutionContext):
"""
London Datastore Metadata bronze bucket
"""
url = asset_urls.get("london_data_store")
response = requests.get(url)
response.raise_for_status()
data = response.json()
pprint(data)
df = pd.DataFrame(data)
try:
url = "https://data.london.gov.uk/api/datasets/export.json"
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.content)
context.log.info(f"There were: {len(data)} catalogue items.")
return data

except Exception as e:
raise e


@asset(
group_name="metadata_catalogues",
io_manager_key="DeltaLake",
metadata={"mode": "overwrite"},
ins={"london_datastore_bronze": AssetIn("london_datastore_bronze")},
)
def london_datastore_silver(context: AssetExecutionContext, london_datastore_bronze):
"""
Process London Datastore Metadata into silver bucket.
"""

# Make sure bronze bucket data can be read in
input_data = london_datastore_bronze

# Validate the data using the Pydantic model
validated = LondonDatastoreCatalogue.model_validate({"items": input_data})

# list to store data pre dataframe
rows = []

for item in validated.items:
base_data = {
"id": item.id,
"title": item.title,
"description": item.description,
"author": item.author,
"author_email": item.author_email,
"maintainer": item.maintainer,
"maintainer_email": item.maintainer_email,
"licence": item.licence,
"licence_notes": item.licence_notes,
"update_frequency": item.update_frequency,
"slug": item.slug,
"state": item.state,
"createdAt": item.createdAt,
"updatedAt": item.updatedAt,
"london_smallest_geography": item.london_smallest_geography,
"tags": ", ".join(item.tags) if item.tags else "",
"topics": ", ".join(item.topics) if item.topics else "",
"shares": str(item.shares),
}

for resource_id, resource in item.resources.items():
resource_data = {
"resource_id": resource_id,
"resource_title": resource.title,
"resource_format": resource.format,
"resource_url": resource.url,
"resource_description": resource.description,
"resource_check_hash": resource.check_hash,
"resource_check_size": resource.check_size,
"resource_check_timestamp": resource.check_timestamp,
}
rows.append({**base_data, **resource_data})

df = pd.DataFrame(rows)

context.log.info(f"Overview: {df.head(25)}")
context.log.info(f"Overview 2: {df.columns}")
context.log.info(f"Overview 2: {df.dtypes}")
context.log.info(f"Overview 2: {df.shape}")

df = df.astype(str)
return df
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# WORK IN PROGRESS

# from entsoe import EntsoePandasClient
# import pandas as pd


# def test_response():
# client = EntsoePandasClient(api_key="5f5dc606-7180-49b2-833f-040c3bc23c81")
# country_code = "10Y1001A1001A92E"
# start = pd.Timestamp("20240115", tz="Europe/London")
# end = pd.Timestamp("20240316", tz="Europe/London")
# return client.query_generation_per_plant(
# country_code, start=start, end=end, psr_type=None, include_eic=False
# )

# v = test_response()
# print(v)
Empty file.
Empty file.
Empty file.
Empty file.
5 changes: 4 additions & 1 deletion analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
)

# Metadata
metadata_job_1 = define_asset_job(name="metadata_job_1", selection=["london_datastore"])
metadata_job_1 = define_asset_job(
name="metadata_job_1",
selection=["london_datastore_bronze", "london_datastore_silver"],
)

# Energy
energy_job_1 = define_asset_job(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,53 @@
from pydantic import BaseModel, Field
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any
from datetime import datetime


class Resource(BaseModel):
format: str
title: str
url: str
check_hash: Optional[str] = None
check_http_status: Optional[int] = None
check_mimetype: Optional[str] = None
check_size: Optional[int] = None
check_timestamp: Optional[datetime] = None
format: Optional[str] = None
london_res_geo: List[str] = Field(default_factory=list)
description: Optional[str] = None
london_release_date: Optional[str] = None
london_res_geo: Optional[List[str]] = None
order: Optional[int] = None
resource_type: Optional[str] = None
searchDescription: Optional[str] = None
searchFilename: Optional[str] = None
searchTitle: Optional[str] = None
title: Optional[str] = None
url: Optional[str] = None
temporal_coverage_from: Optional[str] = None
temporal_coverage_to: Optional[str] = None


class Dataset(BaseModel):
class DatasetItem(BaseModel):
author: Optional[str] = None
author_email: Optional[str] = None
createdAt: Optional[datetime] = None
description: Optional[str] = None
id: str
licence: Optional[str] = None
licence_notes: Optional[str] = None
london_smallest_geography: Optional[str] = None
maintainer: Optional[str] = None
maintainer_email: Optional[str] = None
parent: Optional[str] = None
resources: Dict[str, Resource] = Field(default_factory=dict)
shares: Dict[str, Dict] = Field(default_factory=lambda: {"orgs": {}, "users": {}})
resources: Dict[str, Resource]
shares: Optional[Dict[str, Any]] = None
sharing: Optional[str] = None
slug: str
state: Optional[str] = None
tags: List[str] = Field(default_factory=list)
tags: Optional[List[str]] = None
title: str
topics: List[str] = Field(default_factory=list)
topics: Optional[List[str]] = None
update_frequency: Optional[str] = None
updatedAt: Optional[datetime] = None
parent: Optional[str] = None
london_smallest_geography: Optional[str] = None
london_bounding_box: Optional[str] = None
odi_certificate: Optional[str] = Field(None, alias="odi-certificate")


class DataCollection(BaseModel):
datasets: List[Dataset]
class LondonDatastoreCatalogue(BaseModel):
items: List[DatasetItem]
Empty file.
Empty file.

0 comments on commit 48e3c3a

Please sign in to comment.