Skip to content

Commit

Permalink
Updates 2024-08-16 - Added Slack Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 16, 2024
1 parent 4ce2766 commit b4569bb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
17 changes: 13 additions & 4 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dagster import Definitions, load_assets_from_modules
from os import getenv
from dagster import Definitions, load_assets_from_modules, RunFailureSensorContext
from dagster_slack import SlackResource, make_slack_on_run_failure_sensor

from .assets.location_data_assets import analytics_platform_os_assets
from .assets.environment_data_assets import analytics_platform_ea_flood_areas
Expand Down Expand Up @@ -28,6 +30,11 @@
infrastructure_job_1,
)

slack_failure_sensor = make_slack_on_run_failure_sensor(
slack_token=getenv("SLACKBOT"),
channel="#pipelines",
)

defs = Definitions(
assets=load_assets_from_modules(
[
Expand All @@ -48,15 +55,17 @@
infrastructure_job_1,
],
schedules=[energy_job_1_daily, trade_job_1_daily],
sensors=[slack_failure_sensor],
resources={
"S3Parquet": S3ParquetManager(
bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"
bucket_name=getenv("BRONZE_DATA_BUCKET")
),
"S3Json": S3JSONManager(
bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"
bucket_name=getenv("BRONZE_DATA_BUCKET")
),
"DeltaLake": AwsWranglerDeltaLakeIOManager(
bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe"
bucket_name=getenv("SILVER_DATA_BUCKET")
),
"slack": SlackResource(token=getenv("SLACKBOT")),
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
io_manager_key="DeltaLake",
metadata={"mode": "append"},
ins={"entsog_gas_uk_data_bronze": AssetIn("entsog_gas_uk_data_bronze")},
required_resource_keys={"slack"}
)
def entsog_gas_uk_data_silver(
context: AssetExecutionContext, entsog_gas_uk_data_bronze
Expand All @@ -92,6 +93,16 @@ def entsog_gas_uk_data_silver(
df = df.astype(str)
context.log.info(f"Success: {df.head(25)}")
context.log.info(f"Success: {df.columns}")

# Send Slack message
context.resources.slack.get_client().chat_postMessage(
channel="#pipelines",
text=f"Hello! This is the Pied Pipeline of Hamelin speaking 🎶\n"
f"ENTSOG Gas UK data successfully processed and stored in Silver bucket.\n"
f"Shape: {df.shape}\n"
f"Data Types:\n{df.dtypes}"
)

return df
except Exception as e:
raise e
3 changes: 2 additions & 1 deletion images/Dockerfile.dagster-daemon
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ RUN pip install --no-cache-dir \
fiona \
shapely \
loguru \
awswrangler
awswrangler \
dagster-slack

# Expose the Dagster daemon port
EXPOSE 3000
Expand Down
3 changes: 2 additions & 1 deletion images/Dockerfile.dagster-webserver
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ RUN pip install --no-cache-dir \
fiona \
shapely \
loguru \
awswrangler
awswrangler \
dagster-slack

# Expose the Dagster webserver port
EXPOSE 3000
Expand Down

0 comments on commit b4569bb

Please sign in to comment.