How to conver YUV flag to OpenCV YUV flag

hello,i try to show video using openCV,cv2.imshow() function,but there are not show video from parrot anafi.
could you help me?

im using this code

 # convert pdraw YUV flag to OpenCV YUV flag
        # import cv2
        cv2_cvt_color_flag = {
            olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[yuv_frame.format()]
        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
        img = cv2.cvtColor(cv2frame, cv2.COLOR_RGB2GRAY)
        cv2.imshow('detect',img)

Hi @mamorubaseball ,

Here is a minimal example I posted in another thread. It should display the frames in a cv2 window.

import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging


class OlympeStreaming(threading.Thread):
    def __init__(self, drone):
        self.drone = drone
        self.frame_queue = queue.Queue()
        self.flush_queue_lock = threading.Lock()
        self.frame_num = 0 
        self.renderer = None
        super().__init__()
        super().start()


    def start(self):
        # Setup your callback functions to do some live video processing
        self.drone.streaming.set_callbacks(
            raw_cb=self.yuv_frame_cb,
            h264_cb=self.h264_frame_cb,
            start_cb=self.start_cb,
            end_cb=self.end_cb,
            flush_raw_cb=self.flush_cb,
        )
        # Start video streaming
        self.drone.streaming.start()
        #self.renderer = PdrawRenderer(pdraw=self.drone.streaming)

    def stop(self):
        if self.renderer is not None:
            self.renderer.stop()
        # Properly stop the video stream and disconnect
        self.drone.streaming.stop()

    def yuv_frame_cb(self, yuv_frame):
        """
        This function will be called by Olympe for each decoded YUV frame.
            :type yuv_frame: olympe.VideoFrame
        """
        yuv_frame.ref()
        self.frame_queue.put_nowait(yuv_frame)

    def flush_cb(self, stream):
        if stream["vdef_format"] != olympe.VDEF_I420:
            return True
        with self.flush_queue_lock:
            while not self.frame_queue.empty():
                self.frame_queue.get_nowait().unref()
        return True

    def start_cb(self):
        pass

    def end_cb(self):
        pass

    def h264_frame_cb(self, h264_frame):
        pass

    def display_frame(self, yuv_frame):
        # the VideoFrame.info() dictionary contains some useful information
        # such as the video resolution
        info = yuv_frame.info()

        height, width = (  # noqa
            info["raw"]["frame"]["info"]["height"],
            info["raw"]["frame"]["info"]["width"],
        )

        # yuv_frame.vmeta() returns a dictionary that contains additional
        # metadata from the drone (GPS coordinates, battery percentage, ...)
        # convert pdraw YUV flag to OpenCV YUV flag
        cv2_cvt_color_flag = {
            olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[yuv_frame.format()]

        # yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
        # i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame

        # Use OpenCV to convert the yuv frame to RGB
        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
        cv2.imshow("Frames via Olympe", cv2frame)
        cv2.waitKey(1)

    def run(self):
        main_thread = next(
            filter(lambda t: t.name == "MainThread", threading.enumerate())
        )
        while main_thread.is_alive():
            with self.flush_queue_lock:
                try:
                    yuv_frame = self.frame_queue.get(timeout=0.01)
                except queue.Empty:
                    continue
                try:
                    self.display_frame(yuv_frame)
                except Exception as e:
                    print(e)
                finally:
                    # Don't forget to unref the yuv frame. We don't want to
                    # starve the video buffer pool
                    yuv_frame.unref()



logger = logging.getLogger(__name__)

if __name__ == "__main__":
        
    #eventually IP will be specified depending on what drone is chosen
    IP = "10.202.0.1"
    drone = olympe.Drone(IP)
    drone.connect()
    drone(TakeOff()).wait().success()
    
    streamer = OlympeStreaming(drone)
    streamer.start()

    ### Flight commands here ###
    time.sleep(300)
    
    streamer.stop()
     
    drone(Landing()).wait().success()
    drone.disconnect()

hello teiszler,thanks a lot !!
I can show video with this script , but I can’t move drone on showing video.
This is script which make drone to detect something and move for tracking.

def display_frame(self, yuv_frame):
        # the VideoFrame.info() dictionary contains some useful information
        # such as the video resolution
        info = yuv_frame.info()

        height, width = (  # noqa
            info["raw"]["fsrame"]["info"]["height"],
            info["raw"]["frame"]["info"]["width"],
        )

        cv2_cvt_color_flag = {
            olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[yuv_frame.format()]

        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
         aruco = cv2.aruco
dictionary = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
        corners, ids, rejectedImgPoints = aruco.detectMarkers(cv2frame, dictionary) #マーカを検出
        aruco.drawDetectedMarkers(cv2frame, corners, ids, (0,255,0))
        if ids != None:
            idno = ids[0,0]
            if idno == 0:
                print('kennshutu')
                assert self.drone(FlyingStateChanged(state='hovering')).wait().success()
                assert self.drone(extended_move_by(0,0,0,math.pi,1,0,0)).wait().success()

        cv2.imshow('cv2',cv2frame)
        cv2.waitKey(1)

I cant move during show video.Could you help me?

Hi @mamorubaseball ,

I didn’t do any flight commands for simplicity, but you can just remove the

### Flight commands here ###
time.sleep(300)

and put your flight commands there and it will stream while moving. If you want to do some actuation (i.e. tracking when you detect something), you can send flight commands from the streaming thread, but you may need to cancel whatever the currently executing instruction is. For example, if you told the drone to moveTo a particular location and it is en route, you would need to cancel the moveTo and then issue the new command. We haven’t actually done this yet so I don’t have code to share with you, but looking at the Olympe API, it seems like this should be possible.

Regards,
Tom

1 Like

thanks a lot !
I tyr it

Hi,

I am getting the following error

Drone object has no attribute “streaming” . Any suggestions

thank you

@sibujacob

Is drone an instance of olympe.Drone?

drone = olympe.Drone(IP)

Can you add the entire python script? You can check what members a module has by using dir, so you can try the following in an interactive python session:

$ python 
Python 3.8.10 (default, Sep 28 2021, 16:10:42) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import olympe
>>> dir(olympe.Drone)
['__abstractmethods__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_abc_impl', '_async_discover_device', '_async_get_device', '_canceled_cb', '_cmd_itf_send_status_cb', '_connect_impl', '_connected_cb', '_connecting_cb', '_create_backend', '_create_command_interface', '_create_pdraw_interface', '_declare_callbacks', '_destroy_pdraw', '_disconnected_cb', '_disconnection_impl', '_dispose_cmd_cb', '_dispose_pdraw', '_do_connect', '_get_message', '_link_quality_cb', '_link_status_cb', '_on_connected', '_on_connection_state_changed', '_on_device_removed', '_piloting_timer_cb', '_recv_cmd_cb', '_recv_message_type', '_reset_instance', '_send_command_impl', '_send_command_raw', '_send_piloting_command', '_send_protobuf_command', '_send_states_settings_cmd', '_start_piloting_impl', '_stop_piloting_impl', '_subscriber_overrun', '_synchronize_clock', 'async_connect', 'async_disconnect', 'check_state', 'connect', 'connected', 'connection_state', 'decoding_errors', 'destroy', 'disconnect', 'get_last_event', 'get_state', 'media', 'media_autoconnect', 'mission', 'piloting', 'piloting_pcmd', 'query_state', 'register_message', 'schedule', 'scheduler', 'set_device_name', 'start_piloting', 'stop_piloting', 'streaming', 'subscribe', 'unsubscribe', 'update_logger']

If you see ‘streaming’, then olympe.Drone.streaming exists.

Regards,
Tom

1 Like

Hi

Actually i can fly the drone using the python code . But while the program goes to using the streaming example provided here created the attribute error. Let me check as you explained here the interactive session and revert back .

thank you

Hi,

i am posting here your results. Yes there is streaming in the dir …cmd

Hi,
I am still facing the same error. Please note that i make my program thro Skycontroller. It works while doing flight actions but streaming it is failing.

My python code below

from future import print_function
import olympe
import time
import logging
import queue
import threading
from olympe.messages.skyctrl.CoPiloting import setPilotingSource
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing,moveTo, NavigateHome
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
from olympe.messages.ardrone3.PilotingSettings import MaxTilt

SKYCTRL_IP = “192.168.53.1”

class OlympeStreaming(threading.Thread):
def init(self, drone):
#self.drone = drone(SKYCTRL_IP)
self.drone = olympe.Drone(SKYCTRL_IP)
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
self.frame_num = 0
self.renderer = None
super().init()
super().start()

def start(self):
    # Setup your callback functions to do some live video processing
    self.drone.streaming.set_callbacks(
        raw_cb=self.yuv_frame_cb,
        h264_cb=self.h264_frame_cb,
        start_cb=self.start_cb,
        end_cb=self.end_cb,
        flush_raw_cb=self.flush_cb,
    )

Start video streaming
self.drone.streaming.start()
#self.renderer = PdrawRenderer(pdraw=self.drone.streaming)

def stop(self):
    if self.renderer is not None:
        self.renderer.stop()
    # Properly stop the video stream and disconnect
    self.drone.streaming.stop()

def yuv_frame_cb(self, yuv_frame):
    """
    This function will be called by Olympe for each decoded YUV frame.
        :type yuv_frame: olympe.VideoFrame
    """
    yuv_frame.ref()
    self.frame_queue.put_nowait(yuv_frame)

def flush_cb(self, stream):
    if stream["vdef_format"] != olympe.VDEF_I420:
        return True
    with self.flush_queue_lock:
        while not self.frame_queue.empty():

self.frame_queue.get_nowait().unref()
return True

def start_cb(self):
    pass

def end_cb(self):
    pass

def h264_frame_cb(self, h264_frame):
    pass

def display_frame(self, yuv_frame):
    # the VideoFrame.info() dictionary contains some useful information
    # such as the video resolution
    info = yuv_frame.info()

    height, width = (  # noqa
        info["raw"]["frame"]["info"]["height"],

info[“raw”][“frame”][“info”][“width”],
)

    # yuv_frame.vmeta() returns a dictionary that contains additional
    # metadata from the drone (GPS coordinates, battery percentage, ...)
    # convert pdraw YUV flag to OpenCV YUV flag
    cv2_cvt_color_flag = {
        olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
        olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
    }[yuv_frame.format()]

    # yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
    # i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame

    # Use OpenCV to convert the yuv frame to RGB
    cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
    cv2.imshow("Frames via Olympe", cv2frame)
    cv2.waitKey(1)

def run(self):
    main_thread = next(
        filter(lambda t: t.name == "MainThread", threading.enumerate())
    )
    while main_thread.is_alive():
        with self.flush_queue_lock:
            try:
                yuv_frame = self.frame_queue.get(timeout=0.01)

except queue.Empty:
continue
try:
self.display_frame(yuv_frame)
except Exception as e:
print(e)
finally:
# Don’t forget to unref the yuv frame. We don’t want to
# starve the video buffer pool
yuv_frame.unref()

logger = logging.getLogger(name)

#def main():
if name == “main”:
drone = olympe.Drone(SKYCTRL_IP)
drone.connect()
drone(setPilotingSource(source=“Controller”)).wait().success()
# print("Drone MaxTilt = ", drone.get_state(MaxTiltChanged)[“current”])
#streamer = OlympeStreaming(drone)
#streamer.start()

assert drone(TakeOff()).wait().success()
time.sleep(3)
drone(moveBy(0.5, 0,0,0)).wait()    #   forward with positive value
streamer = OlympeStreaming(drone)
streamer.start()


# time.sleep(3)
# drone(moveBy(0, 0.5,0,0)).wait()  # Right with psitve value
# time.sleep(3)

drone(moveBy(0, 0, 0,0)).wait()
time.sleep(30)

streamer.stop()
assert drone(Landing()).wait().success()
drone.disconnect()

@sibujacob

We have successfully streamed while connecting via the SkyController as well, so this should not be a problem.

You are creating a new drone object in the streaming thread instead of using the one that you instantiate in the main thread and then pass to it. Instead of
self.drone = olympe.Drone(SKYCTRL_IP)
you should have
self.drone = drone
in OlympeStreaming.init

Regards,
Tom

1 Like

thank you for the feedback . I will check and revert .

Hi

It still is the same attribute error !!

@sibujacob

I think you need to reduce the code until you find the problem. If you just run a tiny script does that work? Something like this:

import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging

logger = logging.getLogger(__name__)

if __name__ == "__main__":
        
    #eventually IP will be specified depending on what drone is chosen
    IP = "192.168.53.1"
    drone = olympe.Drone(IP)
    drone.connect()
    drone(TakeOff()).wait().success()
    

    drone.streaming.set_callbacks(
        raw_cb=None,
        h264_cb=None,
        start_cb=None,
        end_cb=None,
        flush_raw_cb=None,
    )
    # Start video streaming
    drone.streaming.start()

    time.sleep(5)
    
    drone.streaming.stop()
     
    drone(Landing()).wait().success()
    drone.disconnect()
1 Like

HI

After a careful look at the code, I have made the following changes to remove the attribute error

**Instead of using drone.streaming.set_callbacks(...)  ,  drone.streaming.start()  , drone.streaming.stop() , we have used here drone.set_streaming_callbacks(...) .   drone.start_video_streaming() and drone.stop_video_streaming .** 

Now i receive the following screenshot which shows RAW ....RAW.....but no cam image is seen 
![Screenshot from 2021-12-12 17-21-44|690x387](upload://cktHX5D9yGasjZDP2mQmlILulDe.png)
![Screenshot from 2021-12-12 17-21-36|690x387](upload://uRjiiGUazYhOGDhd8lYqNINxWxP.png)
![Screenshot from 2021-12-12 17-21-19|690x387](upload://tUh2ZIkRoY1wX9dBwGgEDmQlG7K.png)

Hi @sibujacob ,

I think this means you are using a version of Olympe prior to 7.0. If you do the following in an interactive python session, it should tell you what version you have installed:

import olympe
print(olympe.__version__)

When Parrot moved to 7.0, we had to update our code in a few locations including moving set_callbacks under the streaming module. This is part of the diff of the commit I made to our code:


-        self.drone.set_streaming_callbacks(
+        self.drone.streaming.set_callbacks(

-        self.drone.start_video_streaming()
+        self.drone.streaming.start()

Below is an example that should work in Olympe 3.0.0:

import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging


class OlympeStreaming(threading.Thread):

    def __init__(self, drone, sample_rate=2):
        self.drone = drone
        self.frame_queue = queue.Queue()
        self.flush_queue_lock = threading.Lock()
        super().__init__()
        super().start()

    def start(self):
        # Setup your callback functions to do some live video processing
        self.drone.set_streaming_callbacks(
            raw_cb=self.yuv_frame_cb,
            h264_cb=self.h264_frame_cb,
            start_cb=self.start_cb,
            end_cb=self.end_cb,
            flush_raw_cb=self.flush_cb,
        )
        # Start video streaming
        self.drone.start_video_streaming()

    def stop(self):
        # Properly stop the video stream and disconnect
        self.drone.stop_video_streaming()
        self.drone.disconnect()

    def yuv_frame_cb(self, yuv_frame):
        """
        This function will be called by Olympe for each decoded YUV frame.
            :type yuv_frame: olympe.VideoFrame
        """
        yuv_frame.ref()
        self.frame_queue.put_nowait(yuv_frame)

    def flush_cb(self):
        with self.flush_queue_lock:
            while not self.frame_queue.empty():
                self.frame_queue.get_nowait().unref()
        return True

    def start_cb(self):
        pass

    def end_cb(self):
        pass

    def h264_frame_cb(self, h264_frame):
        pass

    def send_yuv_frame_to_server(self, yuv_frame):
        # the VideoFrame.info() dictionary contains some useful information
        # such as the video resolution
        info = yuv_frame.info()
        height, width = info["yuv"]["height"], info["yuv"]["width"]

        # yuv_frame.vmeta() returns a dictionary that contains additional
        # metadata from the drone (GPS coordinates, battery percentage, ...)

        # convert pdraw YUV flag to OpenCV YUV flag
        cv2_cvt_color_flag = {
            olympe.PDRAW_YUV_FORMAT_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.PDRAW_YUV_FORMAT_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[info["yuv"]["format"]]

        # yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
        # i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame

        # Use OpenCV to convert the yuv frame to RGB
        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
        cv2.imshow("Frames via Olympe", cv2frame)
        cv2.waitKey(1)

    def run(self):
        main_thread = next(
            filter(lambda t: t.name == "MainThread", threading.enumerate())
        )
        while main_thread.is_alive():
            with self.flush_queue_lock:
                try:
                    yuv_frame = self.frame_queue.get(timeout=0.01)
                except queue.Empty:
                    continue
                try:
                    self.send_yuv_frame_to_server(yuv_frame)
                except Exception as e:
                    print(e)
                finally:
                    # Don't forget to unref the yuv frame. We don't want to
                    # starve the video buffer pool
                    yuv_frame.unref()


logger = logging.getLogger(__name__)

if __name__ == "__main__":

    # eventually IP will be specified depending on what drone is chosen
    IP = "192.168.53.1"
    drone = olympe.Drone(IP)
    drone.connect()
    drone(TakeOff()).wait().success()

    streamer = OlympeStreaming(drone)
    streamer.start()

    ### Flight commands here ###
    time.sleep(300)

    streamer.stop()

    drone(Landing()).wait().success()
    drone.disconnect()
1 Like

Hi,

Great !! It is now working fine. I am able to see video …Is there some link where i can contribute to your work …thank you very much…

No problem. You’ll have to update a few of those lines if/when you move to Olympe 7.0.0.

No contribution is necessary. Currently our work is in a private repo, but eventually we plan to make it public and open-source. It is part of one of our student’s PhD research.

Regards,
Tom

2 Likes

What i feel strange is i took the Docker direction and yet there too I have the 3.0.0 version and not ver 7! No idea

@sibujacob According to that thread, the docker container was built about 5 months ago. So 3.0 is what was available at that time. You would have to update the Dockerfile to install 7.0 if you wanted to run the environment inside a container. We have a built a Dockerfile for 7.0 that basically just encapsulates the steps in the installation guide from Parrot

1 Like