From afd2648649b057558a6c96d173e56d1bdf957785 Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Sun, 18 Aug 2024 17:55:28 +0100 Subject: [PATCH] updates 2024-08-18 - Minor slack message changes --- .../utils/slack_messages/slack_message.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/analytics_platform_dagster/utils/slack_messages/slack_message.py b/analytics_platform_dagster/utils/slack_messages/slack_message.py index c1b1f82..1c3616f 100644 --- a/analytics_platform_dagster/utils/slack_messages/slack_message.py +++ b/analytics_platform_dagster/utils/slack_messages/slack_message.py @@ -4,12 +4,14 @@ def send_slack_silver_success_message(context, df, asset_name): """ - Function to send a message to Slack following successful completion of an Asset. + Function to send a message to Slack following the successful completion of an Asset. Sends formatted string containing df.info information. + Leverages the AssetExcecuitonContext to access data directly - in this case a Pandas DataFrame. + Args: - context - usuall AssetExecutionContext + context (AssetExecutionContext) df name of the asset @@ -19,18 +21,27 @@ def send_slack_silver_success_message(context, df, asset_name): df.info(buf=buffer) df_info = buffer.getvalue() + # call the postMessage method. context.resources.slack.get_client().chat_postMessage( channel="#pipelines", text=f"{asset_name} successfully processed and stored in Silver Bucket.\n" - f"DataFrame Info:\n```\n{df_info}\n```" + f"Data Overview:\n```\n{df_info}\n```" ) def with_slack_notification(asset_name): """ Wrapper to create a slack message decorator for an asset. + This will only send a message if the return is successful. + + Otherwise the function completes and no message is sent. + + For example (not full code snippet): + + import pandas as pd + from dagster import asset, AssetIn, AssetExecutionContext + from ...utils.slack_messages.slack_message import with_slack_notification - For example: @asset( group_name="energy_assets", io_manager_key="DeltaLake", @@ -43,7 +54,7 @@ def entsog_gas_uk_data_silver(context: AssetExecutionContext, entsog_gas_uk_data etc... """ - def decorator(func): + def slack_df_success_decorator(func): @wraps(func) def wrapper(context, *args, **kwargs): result = func(context, *args, **kwargs) @@ -51,4 +62,4 @@ def wrapper(context, *args, **kwargs): send_slack_silver_success_message(context, result, asset_name) return result return wrapper - return decorator + return slack_df_success_decorator