Skip to content

Commit

Permalink
updates 2024-08-18 - Minor slack message changes
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 18, 2024
1 parent 9270994 commit afd2648
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions analytics_platform_dagster/utils/slack_messages/slack_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

def send_slack_silver_success_message(context, df, asset_name):
"""
Function to send a message to Slack following successful completion of an Asset.
Function to send a message to Slack following the successful completion of an Asset.
Sends formatted string containing df.info information.
Leverages the AssetExcecuitonContext to access data directly - in this case a Pandas DataFrame.
Args:
context - usuall AssetExecutionContext
context (AssetExecutionContext)
df
name of the asset
Expand All @@ -19,18 +21,27 @@ def send_slack_silver_success_message(context, df, asset_name):
df.info(buf=buffer)
df_info = buffer.getvalue()

# call the postMessage method.
context.resources.slack.get_client().chat_postMessage(
channel="#pipelines",
text=f"{asset_name} successfully processed and stored in Silver Bucket.\n"
f"DataFrame Info:\n```\n{df_info}\n```"
f"Data Overview:\n```\n{df_info}\n```"
)

def with_slack_notification(asset_name):
"""
Wrapper to create a slack message decorator for an asset.
This will only send a message if the return is successful.
Otherwise the function completes and no message is sent.
For example (not full code snippet):
import pandas as pd
from dagster import asset, AssetIn, AssetExecutionContext
from ...utils.slack_messages.slack_message import with_slack_notification
For example:
@asset(
group_name="energy_assets",
io_manager_key="DeltaLake",
Expand All @@ -43,12 +54,12 @@ def entsog_gas_uk_data_silver(context: AssetExecutionContext, entsog_gas_uk_data
etc...
"""
def decorator(func):
def slack_df_success_decorator(func):
@wraps(func)
def wrapper(context, *args, **kwargs):
result = func(context, *args, **kwargs)
if isinstance(result, pd.DataFrame):
send_slack_silver_success_message(context, result, asset_name)
return result
return wrapper
return decorator
return slack_df_success_decorator

0 comments on commit afd2648

Please sign in to comment.