-
Notifications
You must be signed in to change notification settings - Fork 0
/
backup_archivelog.py
83 lines (67 loc) · 2.66 KB
/
backup_archivelog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
# The DAG object;
from airflow import DAG
# SSHHooK for SSH connections
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
# Operators;
from airflow.operators.bash import BashOperator
# Webhook
from ms_teams_webhook_operator import MSTeamsWebhookOperator
# Functions
def on_failure(context):
dag_id = context['dag_run'].dag_id
task_id = context['task_instance'].task_id
context['task_instance'].xcom_push(key=dag_id, value=True)
logs_url = "https://vnhqmgrdb01.hcnet.vn:8080/log?dag_id={}&task_id={}&execution_date={}".format(
dag_id, task_id, context['ts'])
teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_failure", trigger_rule="all_done",
message="**Airflow notification**: `{}` has failed on task: `{}`".format(dag_id, task_id),
button_text="View log", button_url=logs_url,
theme_color="FF0000", http_conn_id='msteams_webhook_url')
teams_notification.execute(context)
# Define SSHHook
sshHook = SSHHook(ssh_conn_id="dbgenesys03_ssh")
# Define DAG
with DAG(
"dbgenesys03_backup_archivelog",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="dbgenesys03 backup archivelog",
start_date=pendulum.datetime(2023, 6, 14, tz="Asia/Saigon"),
schedule_interval='30 12,21 * * *',
catchup=False,
tags=["dbgenesys03","genesys"],
) as dag:
command_01="/home/oracle/bin/main_backup_archivelog.sh gsvn1 > /dev/null 2>&1 "
# Tasks
t1 = SSHOperator(
task_id="backup_archivelog",
command=command_01,
ssh_hook=sshHook,
cmd_timeout=None,
)