-
Notifications
You must be signed in to change notification settings - Fork 0
/
AjouSlackConsumerAvro.py
105 lines (88 loc) · 3.2 KB
/
AjouSlackConsumerAvro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import time
from io import BytesIO
from confluent_kafka import Consumer, KafkaError, KafkaException
from fastavro import parse_schema, schemaless_reader, writer
from slack import WebClient
from slack.errors import SlackApiError
schema = { # avsc
"namespace": "ajou.parser",
"name": "Notice", # seperated but will be namespace.name
"doc": "A notice parser from Ajou university.",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "title", "type": "string"},
{"name": "date", "type": "string"},
{"name": "link", "type": "string"},
{"name": "writer", "type": "string"},
],
}
parsed_schema = parse_schema(schema)
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": os.environ["VM_SERVER"],
"group.id": "ajou-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
# "value.deserializer": lambda v: json.loads(v.decode("utf-8")),
# "debug": "broker, cgrp",
}
c = Consumer(settings)
# Topic = "AJOU-NOTIFY
c.subscribe(["AJOU-NOTIFY"])
try:
while True:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = c.poll(1.0)
if msg is None:
time.sleep(10)
continue
elif not msg.error():
print("Received messages: {0}".format(len(msg)))
if msg.value() is None:
print("But the message value is empty.")
continue
# Consumer read message
message = msg.value()
rb = BytesIO(message)
app_msg = schemaless_reader(rb, parsed_schema) # read one record
title = app_msg["title"]
date = app_msg["date"]
href = app_msg["link"]
writer = app_msg["writer"]
channel = "아주대" # C01G2CR5MEE
text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % (
date,
writer,
title,
href,
)
print('\nSending message "%s" to channel %s' % (text, channel))
try:
sc_response = sc.chat_postMessage(
channel=channel, text=text, as_user=True, username="아주대 공지 봇"
) # as_user won't work with new slack app
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
elif msg.error():
raise KafkaException(msg.error())
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(e)
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("Closing...")
c.close()