-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhandlers.py
375 lines (272 loc) · 11.8 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# coding=utf-8
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import ConversationHandler, Updater
import logging
import urllib.request
import os, sys
from Cam import MainClass
from utils import add_id, elegible_user, read_token_psw
TOKEN, psw = read_token_psw()
print("TOKEN : " + TOKEN + "\nPassword : " + psw)
updater = Updater(TOKEN)
disp = updater.dispatcher
logger = logging.getLogger('motionlog')
cam = MainClass(updater)
FLAG_KEYBOARD = InlineKeyboardMarkup([
[InlineKeyboardButton("Motion Detection", callback_data="/flag motion"),
InlineKeyboardButton("Video", callback_data="/flag face_video"),
InlineKeyboardButton("Movement Square", callback_data="/flag square")],
[InlineKeyboardButton("Face Photo", callback_data="/flag face_photo"),
InlineKeyboardButton("Face Reco", callback_data="/flag face_reco"),
InlineKeyboardButton("Debug", callback_data="/flag debug")],
[InlineKeyboardButton("Done", callback_data="/flag done")],
])
FLAG_SEND = """
Here you can set the values of your flags, either <b>ON</b> or <b>OFF</b>
-- <b>Motion Detection</b> : If set to <i>ON</i> the bot will notify, both with a message and with a video, you when a movement has been detected
---- <b>Video</b> : If set to <i>ON</i> the video you recieve from the <i>Motion Detection</i> above will highlith faces
---- <b>Movement square</b> : If set to <i>ON</i> the video will present green squares where the movement has been detected, (set it to off for faster computation)
---- <b>Face Photo</b> : If set to <i>ON</i> you will recieve a photo of the detected face with the video
---- <b>Face Reco(gnizer)</b> : If set to <i>ON</i> the program will try to guess the person face
-- <b>Debug</b> : If set to <i>ON</i> you will recieve the images from the debug
To set a flag just click on the corrispondent button.
Note that <b>Face Photo</b> depends on <b>Face Video</b> which depends on <b>Motion Detection</b>, so unless this last on is set <b>ON</b> the other won't work
Current value are the following :"""
# ===============Callback, Conversation===================
def annulla(bot, update):
"""Fallback function for the conversation handler"""
update.message.reply_text("Error")
return ConversationHandler.END
def flag_setting_callback(bot, update):
"""Function to respond to the user choiche for the flag setting"""
param = update.callback_query.data.split()[1]
global FLAG_KEYBOARD
# print("Flag callback")
if param == "motion":
cam.motion.motion_flag = not cam.motion.motion_flag
if not cam.motion.motion_flag:
cam.motion.video_flag = False
cam.motion.faces_video_flag = False
cam.motion.face_reco_falg = False
cam.motion.green_squares = False
elif param == "face_video":
cam.motion.video_flag = not cam.motion.video_flag
elif param=="square":
cam.motion.green_squares = not cam.motion.green_squares
elif param == "face_photo":
cam.motion.face_photo_flag = not cam.motion.face_photo_flag
elif param == "debug":
cam.motion.debug_flag = not cam.motion.debug_flag
elif param == "face_reco":
cam.motion.face_reco_falg = not cam.motion.face_reco_falg
elif param == "done":
bot.delete_message(
chat_id=update.callback_query.message.chat_id,
message_id=update.callback_query.message.message_id
)
return
to_change = complete_flags()
bot.edit_message_text(
chat_id=update.callback_query.message.chat_id,
text=to_change,
message_id=update.callback_query.message.message_id,
parse_mode="HTML",
reply_markup=FLAG_KEYBOARD
)
def start(bot, update):
"""Telegram command to start the bot ( it takes part of the conversation handler)"""
# print("start")
update.message.reply_text("Welcome... to start insert the password")
return 1
def get_psw(bot, update):
user_psw = update.message.text
if not user_psw == psw:
update.message.reply_text("Incorrect password...you can not accesst this bot functionalities anymore :(")
add_id(update.message.from_user.id, 0)
else:
update.message.reply_text("Correct password!")
add_id(update.message.from_user.id, 1)
cam.telegram_handler.get_ids(cam.telegram_handler.default_id)
# ===============Commands===================
@elegible_user
def flag_setting_main(bot, update):
"""Telegram command to set the flags for the motion detection"""
global FLAG_KEYBOARD
# print("Flag Main")
to_send = complete_flags()
update.message.reply_text(to_send, reply_markup=FLAG_KEYBOARD, parse_mode="HTML")
@elegible_user
def reset_ground(bot, update):
"""Telegram command to reset the ground truth image (the background)"""
update.message.reply_text("Resetting ground image")
username = update.message.from_user.username
cam.motion.reset_ground("Reset ground asked from @" + username)
update.message.reply_text("Ground image has been reset")
@elegible_user
def get_camshot(bot, update):
"""Telegram command to get a camshot from the camera"""
image = "image_"+str(update.message.from_user.id)+".png"
ret = cam.capture_image(image)
logger.info("photo command called")
if ret:
with open(image, "rb") as file:
bot.sendPhoto(update.message.from_user.id, file)
os.remove(image)
else:
update.message.reply_text("There has been an error...please retry in a few seconds")
@elegible_user
def stream(bot, update, args):
"""Telegram command to take a video from the camera"""
print("Video")
logger.info("video command called")
max_seconds = 20
if not args:
SECONDS = 5
else:
if not len(args) == 1:
update.message.reply_text("You must provide just ONE number for the seconds")
return
try:
SECONDS = int(args[0])
except ValueError:
update.message.reply_text("You did not provide aright number")
return
if SECONDS > max_seconds:
update.message.reply_text("The maximum seconds is " + str(max_seconds) + "...setting deafult 5s")
SECONDS = 5
video_name = "video_"+str(update.message.from_user.id)+".mp4"
update.message.reply_text("Wait " + str(SECONDS) + " seconds...")
cam.capture_video(video_name, SECONDS,update.message.from_user.id)
logger.info("Sending a " + str(SECONDS) + " seconds video")
print("Capture complete")
@elegible_user
def stop_execution(bot, update):
"""Telegram command to stop the bot execution """
logger.info("stop command called")
cam.telegram_handler.send_message("Stopping surveillance")
cam.stop()
logger.info("Stopping execution")
sys.exit(0)
@elegible_user
def send_log(bot, update):
"""Telegram command to send the logger file"""
logger.info("send log command called")
if ("motion.log" in os.listdir("Resources/")):
with open("Resources/motion.log", "rb") as file:
bot.sendDocument(update.message.chat_id, file)
else:
update.message.reply_text("No log file detected!")
@elegible_user
def delete_log(bot, update):
"""Telegram command to send the logger file"""
logger.info("delete log command called")
if ("motion.log" in os.listdir("Resources/")):
os.remove("Resources/motion.log")
update.message.reply_text("Log deleted")
else:
update.message.reply_text("No log file detected!")
with open("Resources/motion.log","w+") as file:
file.write(" ")
@elegible_user
def send_ground(bot, update):
logger.info("ground command called")
print("Sending ground...")
cam.motion.send_ground(update.message.from_user.id,"Current background image")
print("...Done")
@elegible_user
def help_bot(bot, update):
help_str="""
Welcome to this bot!
You can use it to with a camera to create your own surveillance system.
The avaiable commands are the following:
- /start - start bot
- /photo : get a camshot from the camera
- /video seconds : get a video from the camera with <i>seconds</i> duration, the default duration is 5 seconds
- /flags : set the flags
- /resetg : reset the backgroud image
- /stop : stop surveillance execution
- /logsend : send the logger file
- /logdel : delete the log file
- /bkground : send the background image
- /classify : classify the person face
This bot has multiple functionalities:
<b>==Movement detection==</b>
When there is a detected change between the background image and the current frame from the camera you will be notified with a message.
You can set the flags (try the /flags command) to get multiple information from the camera, such as the video of the movement, the detected faces in the video and so on.
<b>==Camera shotter==</b>
You can use the commands /photo and /video to get a <b>live</b> update of what the camera is seeing
<b>==Telegram access==</b>
When a new user starts the bot it will be asked for the password. This password can be set from the source code <b>ONLY</b> and you will have <b>just one chance</b> to get it right.
If you fail the bot will block you and no commands will be executed. Otherwise you will be granted full access to the bot functionalities.
The bot password can be set in the <i>Resources/token_psw.txt</i> file (check out the README).
When a movement is detected, depending on the flags values, every id in the ids file will be notified (if not blocked).
<b>==Face recognizer==</b>
If the proper falg is set to True and a face has been detected in the video, the face recognizer will try to guess the person whose face has been seen.
When the bot first starts it will train the face recognizer with the saved faces. To save a new faces simply use the /classify command and follow th instructions.
The more faces the recognizer find out the more precise it will be
Thank you for choosing this bot, I hope you like it.
"""
update.message.reply_text(help_str,parse_mode="HTML")
@elegible_user
def predict_face(bot, update):
"""
Predict the face in the photo
:param bot:
:param update:
:return:
"""
file_path="downloaded.jpg"
#get the photo file
fileID = update.message.photo[-1].file_id
file_url = bot.get_file(fileID).file_path
urllib.request.urlretrieve(file_url, file_path)
#get the text to send
img=cam.predict_face(file_path)
if img is None:
update.message.reply_text("Sorry...no faces found", parse_mode="HTML")
return
cam.telegram_handler.send_image(img,update.message.from_user.id)
# ===============Utils===================
def complete_flags():
"""Function to return the changed flag text"""
global FLAG_SEND
complete_falg_str = FLAG_SEND
# get falg values
motion_detection = cam.motion.motion_flag
face_v = cam.motion.video_flag
movement_sq=cam.motion.green_squares
face_p = cam.motion.face_photo_flag
face_r=cam.motion.face_reco_falg
debug = cam.motion.debug_flag
complete_falg_str += "\n-- <b>Motion Detection</b>"
# complete message
if motion_detection:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
complete_falg_str += "\n-- <b>Video</b>"
if face_v:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
complete_falg_str += "\n-- <b>Movement square</b>"
if movement_sq:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
complete_falg_str += "\n-- <b>Face Photo</b>"
if face_p:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
complete_falg_str += "\n-- <b>Face Reco</b>"
if face_r:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
complete_falg_str += "\n-- <b>Debug</b>"
if debug:
complete_falg_str += " ✅"
else:
complete_falg_str += " ❌"
return complete_falg_str