Skip to content

Commit

Permalink
Adds tweet integration
Browse files Browse the repository at this point in the history
  • Loading branch information
kammradt committed Nov 29, 2021
1 parent 7312757 commit 0bea8e8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 3 deletions.
2 changes: 1 addition & 1 deletion infra/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apache-airflow==2.1.3
apache-airflow[postgres]
tweepy==3.10.0
tweepy==4.3.0
psycopg2-binary==2.9.1
Binary file removed src/dags/__pycache__/dag-random-card.cpython-38.pyc
Binary file not shown.
Binary file not shown.
Binary file removed src/dags/__pycache__/dag-random-card_.cpython-38.pyc
Binary file not shown.
39 changes: 37 additions & 2 deletions src/dags/dag-random-card.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
import tweepy
import os


@dataclass
Expand All @@ -33,10 +36,35 @@ def print_card_info(card: Card):
print(f'👇 You can download the image here: {card.url}')


def get_authenticated_client() -> tweepy.API:
api_key = Variable.get('api_key')
api_key_secret = Variable.get('api_key_secret')
access_token = Variable.get('access_token')
access_token_secret = Variable.get('access_token_secret')

auth = tweepy.OAuthHandler(api_key, api_key_secret)
auth.set_access_token(access_token, access_token_secret)

return tweepy.API(auth)

def tweet_card(card: Card):
client = get_authenticated_client()
filename = 'card_image.png'

with open(filename, 'wb') as handler:
# Baixa a imagem
handler.write(get(card.url, allow_redirects=True).content)
#Faz o tweet
tweet_text = f'A carta aleatória de agora é: {card.name}, com valor de ${card.price}'
client.update_with_media(filename, tweet_text)
#Exclui o arquivo da imagem
os.remove(filename)


with DAG(
dag_id='random_card_dag',
default_args=dict(retries=3, start_date=days_ago(7)),
schedule_interval=None,
schedule_interval='0 * * * *',
catchup=False
) as dag:
start = DummyOperator(task_id='Start')
Expand All @@ -60,6 +88,13 @@ def print_card_info(card: Card):
params=asdict(card),
)

tweet = PythonOperator(
task_id='tweet',
python_callable=tweet_card,
op_args=[card]
)


end = DummyOperator(task_id='End')

start >> print_card_info >> create_table_cards >> insert_card >> end
start >> print_card_info >> create_table_cards >> insert_card >> tweet >>end

0 comments on commit 0bea8e8

Please sign in to comment.