-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_generator.py
49 lines (36 loc) · 1.65 KB
/
twitter_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from kafka import KafkaProducer
import tweepy
import os
import time
from dotenv import load_dotenv
load_dotenv()
TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
TWITTER_INTERVAL_SECONDS = int(os.getenv('TWITTER_INTERVAL_SECONDS', 60))
KAFKA_SERVER = os.getenv("KAFKA_SERVER", '0.0.0.0:9092')
WORDS_TOPIC = os.getenv("WORDS_TOPIC", "wc")
ELECTION_TOPIC = os.getenv("ELECTION_TOPIC", "election")
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
twitter_client = tweepy.Client(bearer_token=TWITTER_BEARER_TOKEN)
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.offset)
def on_send_error(excp):
print('I am an error callback', exc_info=excp)
candidates = ['Lula', 'Bolsonaro', 'Simone Tebet', 'Ciro Gomes']
while True:
# send to word count topic
tweets = twitter_client.search_recent_tweets(query="covid", max_results=10)
sentences = [tweet.text for tweet in tweets.data]
for sentence in sentences:
producer.send(WORDS_TOPIC, sentence.encode()).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
# send to candidate topic
for candidate in candidates:
candidate_tweets = twitter_client.search_recent_tweets(query=candidate, max_results=10)
sentences = [tweet.text for tweet in candidate_tweets.data]
for sentence in sentences:
phrase = f'{candidate},{sentence}'
producer.send(ELECTION_TOPIC, phrase.encode()).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
print(f"Sleeping for {TWITTER_INTERVAL_SECONDS} seconds")
time.sleep(TWITTER_INTERVAL_SECONDS)