diff --git a/analytics_platform_dagster/__init__.py b/analytics_platform_dagster/__init__.py index ea0ebce..c147335 100644 --- a/analytics_platform_dagster/__init__.py +++ b/analytics_platform_dagster/__init__.py @@ -1,4 +1,6 @@ -from dagster import Definitions, load_assets_from_modules +from os import getenv +from dagster import Definitions, load_assets_from_modules, RunFailureSensorContext +from dagster_slack import SlackResource, make_slack_on_run_failure_sensor from .assets.location_data_assets import analytics_platform_os_assets from .assets.environment_data_assets import analytics_platform_ea_flood_areas @@ -28,6 +30,11 @@ infrastructure_job_1, ) +slack_failure_sensor = make_slack_on_run_failure_sensor( + slack_token=getenv("SLACKBOT"), + channel="#pipelines", +) + defs = Definitions( assets=load_assets_from_modules( [ @@ -48,15 +55,17 @@ infrastructure_job_1, ], schedules=[energy_job_1_daily, trade_job_1_daily], + sensors=[slack_failure_sensor], resources={ "S3Parquet": S3ParquetManager( - bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5" + bucket_name=getenv("BRONZE_DATA_BUCKET") ), "S3Json": S3JSONManager( - bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5" + bucket_name=getenv("BRONZE_DATA_BUCKET") ), "DeltaLake": AwsWranglerDeltaLakeIOManager( - bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe" + bucket_name=getenv("SILVER_DATA_BUCKET") ), + "slack": SlackResource(token=getenv("SLACKBOT")), }, ) diff --git a/analytics_platform_dagster/assets/energy_data_assets/entsog_uk_gas_assets.py b/analytics_platform_dagster/assets/energy_data_assets/entsog_uk_gas_assets.py index c538644..be3326c 100644 --- a/analytics_platform_dagster/assets/energy_data_assets/entsog_uk_gas_assets.py +++ b/analytics_platform_dagster/assets/energy_data_assets/entsog_uk_gas_assets.py @@ -77,6 +77,7 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext): io_manager_key="DeltaLake", metadata={"mode": "append"}, ins={"entsog_gas_uk_data_bronze": AssetIn("entsog_gas_uk_data_bronze")}, + required_resource_keys={"slack"} ) def entsog_gas_uk_data_silver( context: AssetExecutionContext, entsog_gas_uk_data_bronze @@ -92,6 +93,16 @@ def entsog_gas_uk_data_silver( df = df.astype(str) context.log.info(f"Success: {df.head(25)}") context.log.info(f"Success: {df.columns}") + + # Send Slack message + context.resources.slack.get_client().chat_postMessage( + channel="#pipelines", + text=f"Hello! This is the Pied Pipeline of Hamelin speaking 🎶\n" + f"ENTSOG Gas UK data successfully processed and stored in Silver bucket.\n" + f"Shape: {df.shape}\n" + f"Data Types:\n{df.dtypes}" + ) + return df except Exception as e: raise e diff --git a/images/Dockerfile.dagster-daemon b/images/Dockerfile.dagster-daemon index 4b04773..826c98f 100644 --- a/images/Dockerfile.dagster-daemon +++ b/images/Dockerfile.dagster-daemon @@ -31,7 +31,8 @@ RUN pip install --no-cache-dir \ fiona \ shapely \ loguru \ - awswrangler + awswrangler \ + dagster-slack # Expose the Dagster daemon port EXPOSE 3000 diff --git a/images/Dockerfile.dagster-webserver b/images/Dockerfile.dagster-webserver index 382db40..24bc1ba 100644 --- a/images/Dockerfile.dagster-webserver +++ b/images/Dockerfile.dagster-webserver @@ -31,7 +31,8 @@ RUN pip install --no-cache-dir \ fiona \ shapely \ loguru \ - awswrangler + awswrangler \ + dagster-slack # Expose the Dagster webserver port EXPOSE 3000