-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
47 lines (41 loc) · 1.19 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from redis import Redis
import psycopg2
import json
redis = Redis(host='redis', port=6379)
def process_user_data(user):
try:
conn = psycopg2.connect(
host="db",
port=5432,
database="postgres",
user="postgres",
password="postgres"
)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (id, first_name, last_name, email, gender, ip_address, user_name, agent, country)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
user['id'],
user['first_name'],
user['last_name'],
user['email'],
user['gender'],
user['ip_address'],
user['user_name'],
user['agent'],
user['country']
))
conn.commit()
cursor.close()
conn.close()
except psycopg2.Error as e:
print("Error inserting user:", e)
def process_user_queue():
while True:
_, user_data = redis.blpop('user_queue')
user_list = json.loads(user_data)
for user in user_list:
process_user_data(user)
if __name__ == '__main__':
process_user_queue()