-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera.py
101 lines (84 loc) · 3.17 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# %%
import time
import uuid
import os
from datetime import datetime
from azure.iot.device import IoTHubDeviceClient
from azure.storage.blob import BlobClient
import cv2
# %%
class device:
def __init__(self, connection):
self.guid = str(uuid.uuid4())
self.client = IoTHubDeviceClient.create_from_connection_string(
connection)
self.vid = cv2.VideoCapture(0)
# maximum number of frames between saving images
self.frame_cap = 1000
# maximum number of images to save before recycling
self.image_cap = 10
# for iteration, should be set to 0
self.frame_no = 0
self.image_no = 0
self.images_path = os.path.join('assets','images')
def wait_for_message(self):
message = self.client.receive_message()
return message
def sleep(self, n):
time.sleep(n)
return None
def activate_monitor(self):
while(True):
# Capture the video frame
# by frame
ret, frame = self.vid.read()
# Display the resulting frame
cv2.imshow('frame', frame)
# the 'q' button is set as the
# quitting button you may use any
# desired button of your choice
if cv2.waitKey(1) & 0xFF == ord('q'):
break
self.frame_no += 1
if self.frame_no==self.frame_cap:
self.frame_no = 0
print(f"{self.frame_cap}th frame reached")
image_name = self.save_files(frame)
self.post_data(image_name)
result = self.upload_to_blob(image_name)
print(result)
# After the loop release the cap object
self.vid.release()
# Destroy all the windows
cv2.destroyAllWindows()
def save_files(self,frame):
image_name = f'frame_{self.image_no}.jpg'
cv2.imwrite(os.path.join(self.images_path,image_name), frame)
self.image_no+=1
if self.image_no>self.image_cap:
self.image_no=1
return image_name
def post_data(self,image_name):
datetime_1 = str(datetime.now())
MSG_TXT = {"camera": self.guid,
"time":datetime_1,
"frame":self.frame_cap,
"image_name":image_name}
# issue with the measage api can't handle single quotes.
self.client.send_message(str(MSG_TXT).replace('\'','"'))
print(str(MSG_TXT))
def upload_to_blob(self,image_name):
try:
blob_info = self.client.get_storage_info_for_blob(image_name)
sas_url = f"https://{blob_info['hostName']}/{blob_info['containerName']}/{blob_info['blobName']}{blob_info['sasToken']}"
with BlobClient.from_blob_url(sas_url) as blob_client:
with open(os.path.join(self.images_path,image_name), "rb") as f:
result = blob_client.upload_blob(f, overwrite=True)
blob_client.close()
return result
except Exception as e:
print("unable to upload : ")
self.client.disconnect()
self.client.connect()
return e
# %%