Skip to content

Commit

Permalink
Updates 2024-08-05 - Added Entsog data bronze
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 5, 2024
1 parent f68e6e8 commit b866142
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 131 deletions.
72 changes: 43 additions & 29 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,60 @@
from .assets.location_data_assets import analytics_platform_os_assets
from .assets.environment_data_assets import analytics_platform_ea_assets
from .assets.trade_data_assets import analytics_platform_dbt_trade_barrier_assets
from .assets.energy_data_assets import analytics_platform_carbon_intensity_assets
from .assets.energy_data_assets import (
analytics_platform_carbon_intensity_assets,
entsog_gas_assets,
)
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets
from .assets.infrastructure_data_assets import analytics_platform_national_charge_points_assets
from .assets.infrastructure_data_assets import (
analytics_platform_national_charge_points_assets,
)

from .utils.io_manager_helper.io_manager import S3ParquetManager, AwsWranglerDeltaLakeIOManager, S3JSONManager
from .utils.io_manager_helper.io_manager import (
S3ParquetManager,
AwsWranglerDeltaLakeIOManager,
S3JSONManager,
)

from .jobs.analytics_platfom_jobs import (
environment_job_1,
environment_job_1,
trade_job_1,
trade_job_1_daily,
trade_job_1_daily,
metadata_job_1,
energy_job_1,
energy_job_1_daily,
infrastructure_job_1
energy_job_1_daily,
infrastructure_job_1,
)

defs = Definitions(
assets=load_assets_from_modules([
analytics_platform_datastore_assets,
analytics_platform_os_assets,
analytics_platform_ea_assets,
analytics_platform_dbt_trade_barrier_assets,
analytics_platform_carbon_intensity_assets,
analytics_platform_national_charge_points_assets
]),

assets=load_assets_from_modules(
[
analytics_platform_datastore_assets,
analytics_platform_os_assets,
analytics_platform_ea_assets,
analytics_platform_dbt_trade_barrier_assets,
analytics_platform_carbon_intensity_assets,
analytics_platform_national_charge_points_assets,
entsog_gas_assets,
]
),
jobs=[
environment_job_1,
trade_job_1,
metadata_job_1,
energy_job_1,
infrastructure_job_1,
],
schedules=[
energy_job_1_daily,
trade_job_1_daily
],
environment_job_1,
trade_job_1,
metadata_job_1,
energy_job_1,
infrastructure_job_1,
],
schedules=[energy_job_1_daily, trade_job_1_daily],
resources={
"S3Parquet": S3ParquetManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"),
"S3Json": S3JSONManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"),
"DeltaLake": AwsWranglerDeltaLakeIOManager(bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe")
"S3Parquet": S3ParquetManager(
bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"
),
"S3Json": S3JSONManager(
bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"
),
"DeltaLake": AwsWranglerDeltaLakeIOManager(
bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe"
),
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import requests

from pydantic import ValidationError
from ...models.energy_data_models.entsog_gas_data_assets_model import EntsogModel
from dagster import AssetExecutionContext, asset, op
from datetime import datetime, timedelta


@op
def validate_model(entsog_data):
"""Validate json against pydantic model"""
try:
EntsogModel.model_validate({"data": entsog_data}, strict=True)
except ValidationError as e:
print("Validation errors:")
for error in e.errors():
print(f"Field: {error['loc']}, Error: {error['msg']}")
raise


@asset(group_name="energy_assets", io_manager_key="S3Json")
def entsog_gas_data_bronze(context: AssetExecutionContext):
"""Put data in bronze bucket"""
# Base URL
base_url = "https://transparency.entsog.eu/api/v1/operationalData.json"
# Parameters that don't change
params = {
"pointDirection": "UK-TSO-0003ITP-00005exit,UK-TSO-0001ITP-00005entry,IE-TSO-0002ITP-00495exit,UK-TSO-0001ITP-00090entry,UK-LSO-0001LNG-00008exit,UK-TSO-0001LNG-00008entry,UK-LSO-0002LNG-00049exit,UK-TSO-0001LNG-00049entry,UK-LSO-0004LNG-00049exit,UK-TSO-0002ITP-00496exit,UK-TSO-0002ITP-00077exit,IE-TSO-0001ITP-00077entry,UK-TSO-0004ITP-00207exit,UK-TSO-0001ITP-00207entry,UK-TSO-0001PRD-00156entry,UK-TSO-0001PRD-00151entry,UK-TSO-0001PRD-00152entry,UK-TSO-0001LNG-00007entry,UK-TSO-0001ITP-00284entry,UK-SSO-0007UGS-00419exit,UK-TSO-0001LNG-00053entry,UK-SSO-0012UGS-00275exit,UK-TSO-0001UGS-00275entry,UK-TSO-0001UGS-00414entry,UK-TSO-0001UGS-00276entry,UK-TSO-0001UGS-00277entry,UK-SSO-0014UGS-00404exit,UK-TSO-0001UGS-00404entry,UK-LSO-0003UGS-00278exit,UK-TSO-0001UGS-00278entry,UK-SSO-0009UGS-00279exit,UK-TSO-0001UGS-00279entry,UK-SSO-0007UGS-00421exit,UK-TSO-0001LNG-00054entry,UK-SSO-0008UGS-00420exit,UK-TSO-0001UGS-00280entry,UK-TSO-0001ITP-00279entry,UK-SSO-0005UGS-00178exit,UK-TSO-0001UGS-00281entry,UK-SSO-0001UGS-00181exit,UK-TSO-0001UGS-00181entry,UK-SSO-0009UGS-00423exit,UK-TSO-0001UGS-00282entry,UK-SSO-0007UGS-00422exit,UK-TSO-0001LNG-00055entry,UK-TSO-0001UGS-00235entry,UK-SSO-0003UGS-00216exit,UK-TSO-0001UGS-00216entry,UK-SSO-0016UGS-00289exit,UK-TSO-0001UGS-00289entry,NO-TSO-0001ITP-00091exit,UK-TSO-0001ITP-00091entry,NO-TSO-0001ITP-00022exit,UK-TSO-0001ITP-00022entry,UK-TSO-0001ITP-00436entry,UK-TSO-0001ITP-00492exit,UK-TSO-0001ITP-00492entry,UK-TSO-0001ITP-00090exit,IE-TSO-0001ITP-00496entry,UK-TSO-0001VTP-00006exit,UK-TSO-0001VTP-00006entry",
"indicator": "Physical Flow",
"periodType": "day",
"timezone": "CET",
"limit": -1,
"dataset": 1,
"formatType": "json",
}
# Calculate date range
today = datetime.now().date()
week_ago = today - timedelta(days=7)
# Add date parameters
params["from"] = week_ago.strftime("%Y-%m-%d")
params["to"] = today.strftime("%Y-%m-%d")

try:
# Make the request
response = requests.get(base_url, params=params)
response.raise_for_status()

# Parse JSON and validate model
data = response.json()
validated_data = data["operationalData"]
validate_model(validated_data)

return validated_data
except requests.RequestException as e:
# Handle any requests-related errors
print(f"Error fetching data: {e}")
raise
except KeyError as e:
# Handle missing key in the JSON response
print(f"Error parsing JSON response: {e}")
raise
except ValidationError as e:
# Handle validation errors
print(f"Data validation error: {e}")
raise
except Exception as e:
# Handle any other unexpected errors
print(f"Unexpected error: {e}")
raise
Original file line number Diff line number Diff line change
@@ -1,99 +1,122 @@
import json
import pandas as pd

from pydantic import ValidationError
from typing import List, Dict
from typing import List, Dict, Any
from ...utils.requests_helper.requests_helper import stream_json, return_json
from ...utils.variables_helper.url_links import asset_urls
from ...models.infrastructure_data_models.national_charge_point_model import ChargeDevice
from ...models.infrastructure_data_models.national_charge_point_model import (
ChargeDevice,
)
from dagster import AssetExecutionContext, AssetIn, asset

DONWLOAD_LINK = asset_urls.get("national_charge_points")
API_ENDPOINT = asset_urls.get("national_charge_points_api")

def fetch_record_count() -> json:

def fetch_record_count() -> int:
"""Returns sample of charge point data to verify total count
Returns:
Json object
Returns:
int: The total count of records
"""

if API_ENDPOINT is None:
raise ValueError("API_ENDPOINT can't be None")

try:
api_endpoint = API_ENDPOINT
api_endpoint = API_ENDPOINT
json_data = return_json(api_endpoint)
return json_data.get("total_count")
total_count = json_data.get("total_count")
return total_count
except Exception as error:
raise error

def fetch_national_charge_point_data() -> List[Dict]:

def fetch_national_charge_point_data() -> List[Dict[str, Any]]:
"""Streams charge point data into memory
Returns:
Returns:
A list of dictionaries
"""

if DONWLOAD_LINK is None:
raise ValueError("Download Link can't be None")

try:
url = DONWLOAD_LINK
return stream_json(url, set_chunk=5000)
except Exception as error:
raise error

@asset(
group_name="infrastructure_assets",
io_manager_key="S3Json"
)

@asset(group_name="infrastructure_assets", io_manager_key="S3Json")
def national_charge_point_data_bronze(context: AssetExecutionContext) -> List[Dict]:
""" Uploads charge point data to S3 using IO Manager.
Fetches json data and compares with total record count to ensure that the correct amount of records was fetched.
"""Uploads charge point data to S3 using IO Manager.
Fetches json data and compares with total record count to ensure that the correct amount of records was fetched.
Returns:
A list of dictionaries
"""

response = fetch_national_charge_point_data()
total_record_count = fetch_record_count()
response_count = len(response)
# Compare counts using sets.

# Compare counts using sets.
# This checks that all the data was returned and we haven't double counted and/or lost records.
if set([total_record_count]) == set([response_count]):
context.log.info(f"Record counts match: {total_record_count}")
else:
context.log.warning(f"Record count mismatch: Expected {total_record_count}, got {response_count}")
context.log.warning(
f"Record count mismatch: Expected {total_record_count}, got {response_count}"
)

return response


@asset(
group_name="infrastructure_assets",
io_manager_key="DeltaLake",
group_name="infrastructure_assets",
io_manager_key="DeltaLake",
metadata={"mode": "overwrite"},
ins={"national_charge_point_data_bronze": AssetIn("national_charge_point_data_bronze")}
)
def national_charge_point_data_silver(context: AssetExecutionContext, national_charge_point_data_bronze) -> pd.DataFrame:
ins={
"national_charge_point_data_bronze": AssetIn(
"national_charge_point_data_bronze"
)
},
)
def national_charge_point_data_silver(
context: AssetExecutionContext, national_charge_point_data_bronze
) -> pd.DataFrame:
"""Write charge point data out to Delta Lake once Pydantic models has been validated
Returns:
pd.DataFrame
"""
try:
# Create DataFrame directly from input data
df = pd.DataFrame(national_charge_point_data_bronze)

# Validate a sample of 5% of the records against the Pydantic model
sample_size = int(len(df) * 0.05) # 5% of records - increase or lower if needed (0.01 would be 1% for exmaple)
sample_size = int(
len(df) * 0.05
) # 5% of records - increase or lower if needed (0.01 would be 1% for exmaple)
sample = df.sample(n=sample_size)

for _, record in sample.iterrows():
try:
ChargeDevice.model_validate(record.to_dict())
except ValidationError as e:
context.log.warning(f"Validation error in sample: {e}")

# Log some information about the DataFrame to check it all worked
context.log.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns")
context.log.info(
f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns"
)
context.log.info(f"DataFrame sample: {df.head(15)}")
return df

except Exception as e:
context.log.error(f"Error processing data: {e}")
raise
31 changes: 17 additions & 14 deletions analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,47 @@

# Evironment
environment_job_1 = define_asset_job(
name="environment_job_1",
selection=["ea_floods", "ea_flood_areas"]
name="environment_job_1", selection=["ea_floods", "ea_flood_areas"]
)

# Trade
trade_job_1 = define_asset_job(
name="trade_job_1",
selection=["dbt_trade_barriers_bronze", "dbt_trade_barriers_silver"]
selection=["dbt_trade_barriers_bronze", "dbt_trade_barriers_silver"],
)

trade_job_1_daily = ScheduleDefinition(
job=trade_job_1,
cron_schedule="0 0 * * *",
cron_schedule="0 0 * * *",
execution_timezone="Europe/London",
name="trade_daily_schedule"
name="trade_daily_schedule",
)

# Metadata
metadata_job_1 = define_asset_job(
name="metadata_job_1",
selection=["london_datastore"]
)
metadata_job_1 = define_asset_job(name="metadata_job_1", selection=["london_datastore"])

# Energy
energy_job_1 = define_asset_job(
name="energy_job_1",
selection=["carbon_intensity_bronze", "carbon_intensity_silver"]
selection=[
"carbon_intensity_bronze",
"carbon_intensity_silver",
"entsog_gas_data_bronze",
],
)

energy_job_1_daily = ScheduleDefinition(
job=energy_job_1,
cron_schedule="0 0 * * *",
cron_schedule="0 0 * * *",
execution_timezone="Europe/London",
name="energy_daily_schedule"
name="energy_daily_schedule",
)

# Infrastructure
infrastructure_job_1 = define_asset_job(
name="infrastructure_job_1",
selection=["national_charge_point_data_bronze", "national_charge_point_data_silver"]
)
selection=[
"national_charge_point_data_bronze",
"national_charge_point_data_silver",
],
)
Loading

0 comments on commit b866142

Please sign in to comment.