-
Notifications
You must be signed in to change notification settings - Fork 10
/
dag.py
62 lines (49 loc) · 1.69 KB
/
dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from vn_stock.tasks.etl_vndirect_ticker import VNDirectCrawlTicker
from vn_stock.tasks.etl_vndirect_price import VNDirectCrawlPrice
from vn_stock.tasks import config
from vn_stock.tasks.utils import Utils
crawl_ticker = VNDirectCrawlTicker(config.conn_string)
exchanges = Utils.get_exchange(config.conn_string)
default_args = {
'owner': 'vanducng',
'start_date': datetime(2019, 9, 1),
# 'end_date': datetime(2018, 11, 30),
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'catchup': False,
'email_on_retry': False
}
def dummy_for_test():
logging.info("Hello World")
dag = DAG(
"vn_stock.etl",
default_args=default_args,
description='Scrap data from stock website',
# Run on 8am and 8pm when the exchage open and close
schedule_interval="0 8,20 * * *",
max_active_runs=1
)
start_operator = DummyOperator(task_id='begin_execution', dag=dag)
end_operator = DummyOperator(task_id='end_execution', dag=dag)
ticker_ingestion = PythonOperator(
task_id="ticker_ingestion",
python_callable=crawl_ticker.execute_etl,
dag=dag)
price_ingestion_list = []
for _, exc in exchanges.iterrows():
price_ingestion_list.append(
PythonOperator(
task_id=exc["exchange"],
python_callable=VNDirectCrawlPrice(config.conn_string).execute_etl,
op_args=[exc["exchange"]],
dag=dag
)
)
start_operator >> ticker_ingestion >> [
x for x in price_ingestion_list] >> end_operator