-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoll.py
224 lines (191 loc) · 8.69 KB
/
poll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import re
from mautrix.types import (EventType, ReactionEvent, RedactionEvent)
from maubot import Plugin, MessageEvent
from maubot.handlers import command, event
import logging
log = logging.getLogger("maubot.client")
POLL_REGEX = r"\"((?:.|\n)*?)\"|^(.+)$"
EMOJI_REGEX = r"^(?:[\u2600-\u26FF\u2700-\u27BF\U0001F300-\U0001F5FF\U0001F600-\U0001F64F\U0001F680-\U0001F6FF\U0001F900-\U0001F9FF]|\d\ufe0f\u20e3)"
DEFAULT_REACTIONS = [f"{n}\ufe0f\u20e3" for n in range(1,10)]
class Choice:
def __init__(self, text, emoji):
self.text = text
self.emoji = emoji
self.count = 0
class Vote:
def __init__(self, choice, user_id, event_id):
self.choice = choice
self.user_id = user_id
self.event_id = event_id
class Poll:
@classmethod
def parse(cls, poll_setup, author=None):
setup = re.findall(POLL_REGEX, poll_setup, re.MULTILINE)
if len(setup) < 3:
raise ValueError("Not enough arguments supplied (three at least for the question and two choices).")
else:
# Chose the correct capturing group since the regex has two capturing groups for the quote sign and newline syntax:
question = setup[0][0] if setup[0][0] != '' else setup[0][1]
choices = [choice[0] if choice[0] != '' else choice[1] for choice in setup[1:]]
return cls(question, choices, author)
def __init__(self, question, choices, author=None):
self.question = question
self.choices = []
self.votes = []
self.author = author
emojis = []
for i, choice in enumerate(choices):
choice_trimmed = choice.strip()
x = re.match(EMOJI_REGEX, choice)
if x:
emoji = choice_trimmed[:x.span()[1]]
choice_trimmed = choice_trimmed[x.span()[1]:].strip()
log.debug(f"Extracted emoji '{emoji}' for choice '{choice_trimmed}'")
else:
emoji = None
choices[i] = choice_trimmed
emojis.append(emoji)
default_reactions_filtered = list(set(DEFAULT_REACTIONS).difference(set(emojis)))
default_reactions_filtered.sort(reverse=True)
# ToDo: what if more choices without user supplied emojis than default_reactions?
for i, choice in enumerate(choices):
self.choices.append(Choice(choice, emojis[i] if emojis[i] else default_reactions_filtered.pop()))
def vote(self, reaction, user_id, event_id):
log.debug(f"Vote for {reaction} from '{user_id}' (event ID: {event_id})")
self.votes.append(Vote(self.get_choice(reaction), user_id, event_id))
def unvote(self, redact_id):
vote = self.get_vote_by_event(redact_id)
if vote:
log.debug(f"Vote withdrawn ({vote.choice.emoji} from '{vote.user_id}')")
self.votes.remove(vote)
def get_vote_by_event(self, event_id):
for vote in self.votes:
if vote.event_id == event_id:
return vote
return None
def get_choice(self, reaction):
for choice in self.choices:
if choice.emoji == reaction:
return choice
return None
def get_poll(self):
choice_list = " \n".join(
[f"{choice.emoji}: {choice.text}" for choice in self.choices]
)
return f"""Poll created by <a href="https://matrix.to/#/{self.author}">{self.author}</a> (ID: {self.index+1})
**{self.question}**
{choice_list}"""
def count(self):
# Rest choice count:
for choice in self.choices:
choice.count = 0
# Count votes:
voters = set()
for vote in self.votes:
vote.choice.count += 1
voters.add(vote.user_id)
self.voters_count= len(voters)
def get_results(self):
self.count()
votes_count = len(self.votes)
results = " \n".join([f"{choice.emoji} {choice.count}/{self.voters_count} : {choice.text} " for choice in self.choices])
results=f"""# Poll results
**{self.question}**
({self.voters_count} unique voters voted {votes_count} times)
{results}
"""
return results
class PollBot(Plugin):
currentPolls = {}
@command.new("pollhelp")
async def poll_help(self, evt: MessageEvent):
log.debug(evt.sender)
helptext = f"""Hey <a href=\"https://matrix.to/#/{evt.sender}\">{evt.sender}</a>,
I'm here to help you with polls. A poll is usually a message from me sent at your request (`!poll` command).
I also react to poll messages with 🔢 emojis representing the different choices. You can assign emojis to choices yourself.
Users can vote by reacting to a poll message with the corresponding emoji. Enforcing single choice is not supported.
Results can be seen by using the `!pollresults` command or by viewing the reactions of a poll message.
# Create a poll
... using quotes: `!poll "Question" "choice 1" "choice 2" ...`
or by using newlines between question and choices:
```
!poll Question
choice 1
choice 2
...
```
If the first character of a choice is an emoji (i.e. `!poll "How are you?" "👍️Good" "👎️Bad"`) it will be used for voting instead a default one.
The `!lightpoll` command will also create a poll but will use your message directly as the poll message instead of replying.
# View results
Just send `!pollresults` and I will reply with the results of the latest poll.
If you append the ID of the poll (look at the poll message) to the command (i.e. `!pollresults 1`) you can get results of older polls.
"""
await evt.reply(helptext, allow_html=True)
async def create_poll(self, evt, poll_setup):
try:
poll = Poll.parse(poll_setup, evt.sender)
except ValueError:
response = "You need to enter at least 2 choices. **For help send `!pollhelp`**"
await evt.reply(response)
return None
else:
if evt.room_id not in self.currentPolls:
self.currentPolls[evt.room_id] = []
poll_index = len(self.currentPolls[evt.room_id])
poll.index = poll_index
self.currentPolls[evt.room_id].append(poll)
return poll
@command.new("poll", help='Creates a new poll.')
@command.argument("poll_setup", pass_raw=True, required=True)
async def poll_handler(self, evt: MessageEvent, poll_setup: str) -> None:
poll = await self.create_poll(evt, poll_setup)
if poll:
response = poll.get_poll()
poll.event_id = await evt.reply(response, allow_html=True)
for choice in poll.choices:
await evt.client.react(evt.room_id, poll.event_id, choice.emoji)
@command.new("lightpoll", help='Creates a poll directly from the message.')
@command.argument("poll_setup", pass_raw=True, required=True)
async def lightpoll_handler(self, evt: MessageEvent, poll_setup: str) -> None:
poll = await self.create_poll(evt, poll_setup)
if poll:
poll.event_id = evt.event_id
for choice in poll.choices:
await evt.client.react(evt.room_id, poll.event_id, choice.emoji)
@command.new("pollresults", help='Shows results for current poll.')
@command.argument("poll_id", required=False)
async def pollresults_handler(self, evt: MessageEvent, poll_id=None) -> None:
await evt.mark_read()
if evt.room_id in self.currentPolls:
if poll_id == '':
index = -1
else:
try:
poll_id = int(poll_id)
except:
await evt.reply("Malformed ID not known.")
return
else:
if poll_id > 0 and poll_id <= len(self.currentPolls[evt.room_id]):
index = poll_id - 1
else:
await evt.reply("Poll ID not known.")
return
response = self.currentPolls[evt.room_id][index].get_results()
await evt.reply(response)
else:
await evt.reply("No active polls in this room.")
return
@event.on(EventType.REACTION)
async def get_react_vote(self, evt: ReactionEvent) -> None:
if evt.sender != self.client.mxid:
if evt.room_id in self.currentPolls:
for poll in self.currentPolls[evt.room_id]:
if evt.content.relates_to.event_id == poll.event_id:
poll.vote(evt.content.relates_to.key, evt.sender, evt.event_id)
break
@event.on(EventType.ROOM_REDACTION)
async def get_redact_vote(self, evt: RedactionEvent) -> None:
if evt.room_id in self.currentPolls:
for poll in self.currentPolls[evt.room_id]:
poll.unvote(evt.redacts)