Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen2 license plate example separate veh nn #98

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gen2-license-plate-recognition/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ This example demonstrates how to run 2 stage inference on DepthAI using Gen2 Pip
First, a license plate is detected on the image and then the cropped license frame is sent to text detection network,
which tries to decode the license plates texts


> :warning: **This demo is adjusted to detect and recognize chinese license plates**! It may not work with other license plates

## Demo
Expand Down
172 changes: 125 additions & 47 deletions gen2-license-plate-recognition/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ def to_planar(arr: np.ndarray, shape: tuple) -> list:
def create_pipeline():
print("Creating pipeline...")
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(version=dai.OpenVINO.Version.VERSION_2020_1)

if args.camera:
print("Creating Color Camera...")
cam = pipeline.createColorCamera()
cam.setPreviewSize(300, 300)
cam.setPreviewSize(672, 384)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setInterleaved(False)
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
Expand All @@ -58,11 +59,10 @@ def create_pipeline():
cam.preview.link(cam_xout.input)

# NeuralNetwork
print("Creating Detection Neural Network...")
print("Creating License Plates Detection Neural Network...")
det_nn = pipeline.createMobileNetDetectionNetwork()
det_nn.setConfidenceThreshold(0.5)
det_nn.setBlobPath(str(Path("models/vehicle-license-plate-detection-barrier-0106.blob").resolve().absolute()))
det_nn.setNumInferenceThreads(2)
det_nn.input.setQueueSize(1)
det_nn.input.setBlocking(False)
det_nn_xout = pipeline.createXLinkOut()
Expand All @@ -73,12 +73,37 @@ def create_pipeline():
det_nn.passthrough.link(det_pass.input)

if args.camera:
cam.preview.link(det_nn.input)
manip = pipeline.createImageManip()
manip.initialConfig.setResize(300, 300)
manip.initialConfig.setFrameType(dai.RawImgFrame.Type.BGR888p)
cam.preview.link(manip.inputImage)
manip.out.link(det_nn.input)
else:
det_xin = pipeline.createXLinkIn()
det_xin.setStreamName("det_in")
det_xin.out.link(det_nn.input)

# NeuralNetwork
print("Creating Vehicle Detection Neural Network...")
veh_nn = pipeline.createMobileNetDetectionNetwork()
veh_nn.setConfidenceThreshold(0.5)
veh_nn.setBlobPath(str(Path("models/vehicle-detection-adas-0002.blob").resolve().absolute()))
veh_nn.input.setQueueSize(1)
veh_nn.input.setBlocking(False)
veh_nn_xout = pipeline.createXLinkOut()
veh_nn_xout.setStreamName("veh_nn")
veh_nn.out.link(veh_nn_xout.input)
veh_pass = pipeline.createXLinkOut()
veh_pass.setStreamName("veh_pass")
veh_nn.passthrough.link(veh_pass.input)

if args.camera:
cam.preview.link(veh_nn.input)
else:
veh_xin = pipeline.createXLinkIn()
veh_xin.setStreamName("veh_in")
veh_xin.out.link(veh_nn.input)

rec_nn = pipeline.createNeuralNetwork()
rec_nn.setBlobPath(str((Path(__file__).parent / Path('models/license-plate-recognition-barrier-0007.blob')).resolve().absolute()))
rec_nn.input.setBlocking(False)
Expand Down Expand Up @@ -150,12 +175,14 @@ def fps(self):


running = True
detections = []
license_detections = []
vehicle_detections = []
rec_results = []
attr_results = []
licese_frame = None
frame_det_seq = 0
frame_det_seq_map = {}
frame_seq_map = {}
veh_last_seq = 0
lic_last_seq = 0

if args.camera:
fps = FPSHandler()
Expand All @@ -164,42 +191,66 @@ def fps(self):
fps = FPSHandler(cap)


def det_thread(det_queue, det_pass, rec_queue, attr_queue):
global detections, licese_frame
def lic_thread(det_queue, det_pass, rec_queue):
global license_detections, lic_last_seq

while running:
try:
detections = det_queue.get().detections
in_det = det_queue.get().detections
in_pass = det_pass.get()
orig_frame = frame_det_seq_map.get(in_pass.getSequenceNum(), None)

orig_frame = frame_seq_map.get(in_pass.getSequenceNum(), None)
if orig_frame is None:
print("No matching frame found, skipping...")
continue

for map_key in list(filter(lambda item: item <= in_pass.getSequenceNum(), frame_det_seq_map.keys())):
del frame_det_seq_map[map_key]
license_detections = [detection for detection in in_det if detection.label == 2]

for detection in detections:
for detection in license_detections:
bbox = frame_norm(orig_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cropped_frame = orig_frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]

tstamp = time.monotonic()
img = dai.ImgFrame()
img.setTimestamp(tstamp)
img.setType(dai.RawImgFrame.Type.BGR888p)
img.setData(to_planar(cropped_frame, (94, 24)))
img.setWidth(94)
img.setHeight(24)
rec_queue.send(img)

if detection.label == 2: # license plate
img.setData(to_planar(cropped_frame, (94, 24)))
img.setWidth(94)
img.setHeight(24)
rec_queue.send(img)
elif detection.label == 1: # car
img.setData(to_planar(cropped_frame, (72, 72)))
img.setWidth(72)
img.setHeight(72)
attr_queue.send(img)

fps.tick('det')
fps.tick('lic')
except RuntimeError:
continue


def veh_thread(det_queue, det_pass, attr_queue):
global vehicle_detections, veh_last_seq

while running:
try:
vehicle_detections = det_queue.get().detections
in_pass = det_pass.get()

orig_frame = frame_seq_map.get(in_pass.getSequenceNum(), None)
if orig_frame is None:
continue

veh_last_seq = in_pass.getSequenceNum()

for detection in vehicle_detections:
bbox = frame_norm(orig_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cropped_frame = orig_frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]

tstamp = time.monotonic()
img = dai.ImgFrame()
img.setTimestamp(tstamp)
img.setType(dai.RawImgFrame.Type.BGR888p)
img.setData(to_planar(cropped_frame, (72, 72)))
img.setWidth(72)
img.setHeight(72)
attr_queue.send(img)

fps.tick('veh')
except RuntimeError:
continue

Expand Down Expand Up @@ -264,17 +315,22 @@ def attr_thread(q_attr, q_pass):
cam_out = device.getOutputQueue("cam_out", 1, True)
else:
det_in = device.getInputQueue("det_in")
veh_in = device.getInputQueue("veh_in")
rec_in = device.getInputQueue("rec_in")
attr_in = device.getInputQueue("attr_in")
det_nn = device.getOutputQueue("det_nn", 1, False)
det_pass = device.getOutputQueue("det_pass", 1, False)
veh_nn = device.getOutputQueue("veh_nn", 1, False)
veh_pass = device.getOutputQueue("veh_pass", 1, False)
rec_nn = device.getOutputQueue("rec_nn", 1, False)
rec_pass = device.getOutputQueue("rec_pass", 1, False)
attr_nn = device.getOutputQueue("attr_nn", 1, False)
attr_pass = device.getOutputQueue("attr_pass", 1, False)

det_t = threading.Thread(target=det_thread, args=(det_nn, det_pass, rec_in, attr_in))
det_t = threading.Thread(target=lic_thread, args=(det_nn, det_pass, rec_in))
det_t.start()
veh_t = threading.Thread(target=veh_thread, args=(veh_nn, veh_pass, attr_in))
veh_t.start()
rec_t = threading.Thread(target=rec_thread, args=(rec_nn, rec_pass))
rec_t.start()
attr_t = threading.Thread(target=attr_thread, args=(attr_nn, attr_pass))
Expand All @@ -286,10 +342,20 @@ def should_run():


def get_frame():
global frame_det_seq

if args.video:
return cap.read()
read_correctly, frame = cap.read()
if read_correctly:
frame_seq_map[frame_det_seq] = frame
frame_det_seq += 1
return read_correctly, frame
else:
return True, cam_out.get().getCvFrame()
in_rgb = cam_out.get()
frame = in_rgb.getCvFrame()
frame_seq_map[in_rgb.getSequenceNum()] = frame

return True, frame


try:
Expand All @@ -298,40 +364,51 @@ def get_frame():

if not read_correctly:
break

for map_key in list(filter(lambda item: item <= min(lic_last_seq, veh_last_seq), frame_seq_map.keys())):
del frame_seq_map[map_key]

fps.next_iter()

if not args.camera:
tstamp = time.monotonic()
img = dai.ImgFrame()
img.setData(to_planar(frame, (300, 300)))
img.setTimestamp(tstamp)
img.setSequenceNum(frame_det_seq)
img.setType(dai.RawImgFrame.Type.BGR888p)
img.setWidth(300)
img.setHeight(300)
det_in.send(img)
frame_det_seq_map[frame_det_seq] = frame
frame_det_seq += 1
lic_frame = dai.ImgFrame()
lic_frame.setData(to_planar(frame, (300, 300)))
lic_frame.setTimestamp(tstamp)
lic_frame.setSequenceNum(frame_det_seq)
lic_frame.setType(dai.RawImgFrame.Type.BGR888p)
lic_frame.setWidth(300)
lic_frame.setHeight(300)
det_in.send(lic_frame)
veh_frame = dai.ImgFrame()
veh_frame.setData(to_planar(frame, (300, 300)))
veh_frame.setTimestamp(tstamp)
veh_frame.setSequenceNum(frame_det_seq)
veh_frame.setType(dai.RawImgFrame.Type.BGR888p)
veh_frame.setWidth(300)
veh_frame.setHeight(300)
veh_frame.setData(to_planar(frame, (672, 384)))
veh_frame.setWidth(672)
veh_frame.setHeight(384)
veh_in.send(veh_frame)

if debug:
debug_frame = frame.copy()
for detection in detections:
for detection in license_detections + vehicle_detections:
bbox = frame_norm(debug_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.rectangle(debug_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (255, 0, 0), 2)
cv2.putText(debug_frame, f"RGB FPS: {round(fps.fps(), 1)}", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 255, 0))
cv2.putText(debug_frame, f"NN FPS: {round(fps.tick_fps('det'), 1)}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX,
cv2.putText(debug_frame, f"LIC FPS: {round(fps.tick_fps('lic'), 1)}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0))
cv2.putText(debug_frame, f"REC FPS: {round(fps.tick_fps('rec'), 1)}", (5, 45), cv2.FONT_HERSHEY_SIMPLEX,
cv2.putText(debug_frame, f"VEH FPS: {round(fps.tick_fps('veh'), 1)}", (5, 45), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0))
cv2.putText(debug_frame, f"ATTR FPS: {round(fps.tick_fps('attr'), 1)}", (5, 60), cv2.FONT_HERSHEY_SIMPLEX,
cv2.putText(debug_frame, f"REC FPS: {round(fps.tick_fps('rec'), 1)}", (5, 60), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0))
cv2.putText(debug_frame, f"ATTR FPS: {round(fps.tick_fps('attr'), 1)}", (5, 75), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0))
cv2.imshow("rgb", debug_frame)

if licese_frame is not None:
cv2.imshow("license plate", licese_frame)

rec_stacked = None

for rec_img, rec_text in rec_results:
Expand Down Expand Up @@ -377,6 +454,7 @@ def get_frame():
det_t.join()
rec_t.join()
attr_t.join()
veh_t.join()
print("FPS: {:.2f}".format(fps.fps()))
if not args.camera:
cap.release()
Binary file not shown.