Hello everyone,
I am a french engineering student, and I am currently trying to write a program using a Parrot Anafi drone with Olympe(v7.1.0). I am running programs in an Ubuntu VM(20.04) using VirtualBox. I want to recognize a person while flying, I tried the “Streaming.py” example program and I kind of succeeded. The drone takes off, then the streaming starts, and only the first image is processed.
A rectangular shape appears around all persons that are present on the drone’s field of view telling me that they are people, and I can’t figure out what to do to process the video flow in real-time.
I tried to find my answer online, watching lots of tutorials but there is nothing that answered my question. I would really appreciate some help as I am a beginner, thank you for your time and your answers.
I only modified the “def display_frame(self, yuv_frame):” part.
I am using coco dataset, my files are in my project folder.
Here is the streaming.py program with my openCV treatment:
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging
class OlympeStreaming(threading.Thread):
def init(self, drone):
self.drone = drone
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
self.frame_num = 0
self.renderer = None
super().init()
super().start()
def start(self):
# Setup your callback functions to do some live video processing
self.drone.streaming.set_callbacks(
raw_cb=self.yuv_frame_cb,
h264_cb=self.h264_frame_cb,
start_cb=self.start_cb,
end_cb=self.end_cb,
flush_raw_cb=self.flush_cb,
)
# Start video streaming
self.drone.streaming.start()
#self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
def stop(self):
if self.renderer is not None:
self.renderer.stop()
# Properly stop the video stream and disconnect
self.drone.streaming.stop()
def yuv_frame_cb(self, yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
yuv_frame.ref()
self.frame_queue.put_nowait(yuv_frame)
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with self.flush_queue_lock:
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start_cb(self):
pass
def end_cb(self):
pass
def h264_frame_cb(self, h264_frame):
pass
def display_frame(self, yuv_frame):
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
# yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
# i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame
# Use OpenCV to convert the yuv frame to RGB
cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
#my opencv code########################################################
classNames=[]
classFile = 'coco.names'
with open(classFile, 'rt') as f:
classNames = f.read().rstrip('\n').split('\n')
configPath ='ssd_mobilenet_v3_large_coco_2020_01_14.pbtxt'
weightsPath='frozen_inference_graph.pb'
net = cv2.dnn_DetectionModel(weightsPath,configPath)
net.setInputSize(320,320)
net.setInputScale(1.0/ 127.5)
net.setInputMean((127.5, 127.5, 127.5))
net.setInputSwapRB(True)
classIds, confs, bbox =net.detect(cv2frame,confThreshold=0.6)
if len(classIds)!=0:
for classId, confidence, box in zip(classIds.flatten(),confs.flatten(),bbox):
if classId == 1:
cv2.rectangle(cv2frame,box,color=(0,255,0),thickness=2)
cv2.putText(cv2frame,classNames[classId-1].upper(),(box[0]+10,box[1]+30),
cv2.FONT_HERSHEY_COMPLEX,1,(0,255,0),2)
cv2.putText(cv2frame,str(round(confidence*100,2)),(box[0]+250,box[1]+30),
cv2.FONT_HERSHEY_COMPLEX,1,(0,255,0),2)
##########################################################################
cv2.imshow("Frames via Olympe", cv2frame)
cv2.waitKey(1)
def run(self):
main_thread = next(
filter(lambda t: t.name == "MainThread", threading.enumerate())
)
while main_thread.is_alive():
with self.flush_queue_lock:
try:
yuv_frame = self.frame_queue.get(timeout=0.01)
except queue.Empty:
continue
try:
self.display_frame(yuv_frame)
except Exception as e:
print(e)
finally:
# Don't forget to unref the yuv frame. We don't want to
# starve the video buffer pool
yuv_frame.unref()
logger = logging.getLogger(name)
if name == “main”:
#eventually IP will be specified depending on what drone is chosen
IP = "192.168.42.1"
drone = olympe.Drone(IP)
drone.connect()
drone(TakeOff()).wait().success()
streamer = OlympeStreaming(drone)
streamer.start()
### Flight commands here ###
time.sleep(10)
streamer.stop()
drone(Landing()).wait().success()
drone.disconnect()