Streaming.py PDraw

Hello there, I am struggling to use the streaming.py example from the Olympe documentation (olympe/src/olympe/doc/examples/streaming.py at master · Parrot-Developers/olympe · GitHub)
Here is the code I use :

import csv
import math
import os
import queue
import shlex
import subprocess
import tempfile
import threading
import time

import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, Landing
from olympe.messages.ardrone3.Piloting import moveBy
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
from olympe.messages.ardrone3.PilotingSettings import MaxTilt
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
from olympe.video.renderer import PdrawRenderer


olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})

DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT")


class StreamingExample:
    def __init__(self):
        # Create the olympe.Drone object from its IP address
        self.drone = olympe.Drone(DRONE_IP)
        self.tempd = tempfile.mkdtemp(prefix="olympe_streaming_test_")
        print(f"Olympe streaming example output dir: {self.tempd}")
        self.h264_frame_stats = []
        self.h264_stats_file = open(os.path.join(self.tempd, "h264_stats.csv"), "w+")
        self.h264_stats_writer = csv.DictWriter(
            self.h264_stats_file, ["fps", "bitrate"]
        )
        self.h264_stats_writer.writeheader()
        self.frame_queue = queue.Queue()
        self.processing_thread = threading.Thread(target=self.yuv_frame_processing)
        self.renderer = None

    def start(self):
        # Connect to drone
        assert self.drone.connect(retry=3)

        if DRONE_RTSP_PORT is not None:
            self.drone.streaming.server_addr = f"{DRONE_IP}:{DRONE_RTSP_PORT}"

        # You can record the video stream from the drone if you plan to do some
        # post processing.
        # self.drone.streaming.set_output_files(
        #     video=os.path.join(self.tempd, "streaming.mp4"),
        #     metadata=os.path.join(self.tempd, "streaming_metadata.json"),
        # )

        # Setup your callback functions to do some live video processing
        self.drone.streaming.set_callbacks(
            raw_cb=self.yuv_frame_cb,
            h264_cb=self.h264_frame_cb,
            start_cb=self.start_cb,
            end_cb=self.end_cb,
            flush_raw_cb=self.flush_cb,
        )
        # Start video streaming
        self.drone.streaming.start()
        # self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
        self.running = True
        self.processing_thread.start()

    def stop(self):
        self.running = False
        self.processing_thread.join()
        if self.renderer is not None:
            self.renderer.stop()
        # Properly stop the video stream and disconnect
        assert self.drone.streaming.stop()
        assert self.drone.disconnect()
        self.h264_stats_file.close()

    def yuv_frame_cb(self, yuv_frame):
        """
        This function will be called by Olympe for each decoded YUV frame.

            :type yuv_frame: olympe.VideoFrame
        """
        yuv_frame.ref()
        self.frame_queue.put_nowait(yuv_frame)

    def yuv_frame_processing(self):
        while self.running:
            try:
                yuv_frame = self.frame_queue.get(timeout=0.1)
            except queue.Empty:
                continue
            # You should process your frames here and release (unref) them when you're done.
            # Don't hold a reference on your frames for too long to avoid memory leaks and/or memory
            # pool exhaustion.
            yuv_frame.unref()

    def flush_cb(self, stream):
        if stream["vdef_format"] != olympe.VDEF_I420:
            return True
        while not self.frame_queue.empty():
            self.frame_queue.get_nowait().unref()
        return True

    def start_cb(self):
        pass

    def end_cb(self):
        pass

    def h264_frame_cb(self, h264_frame):
        """
        This function will be called by Olympe for each new h264 frame.

            :type yuv_frame: olympe.VideoFrame
        """

        # Get a ctypes pointer and size for this h264 frame
        frame_pointer, frame_size = h264_frame.as_ctypes_pointer()

        # For this example we will just compute some basic video stream stats
        # (bitrate and FPS) but we could choose to resend it over an another
        # interface or to decode it with our preferred hardware decoder..

        # Compute some stats and dump them in a csv file
        info = h264_frame.info()
        print(f" H264")
        frame_ts = info["ntp_raw_timestamp"]
        if not bool(info["is_sync"]):
            while len(self.h264_frame_stats) > 0:
                start_ts, _ = self.h264_frame_stats[0]
                if (start_ts + 1e6) < frame_ts:
                    self.h264_frame_stats.pop(0)
                else:
                    break
            self.h264_frame_stats.append((frame_ts, frame_size))
            h264_fps = len(self.h264_frame_stats)
            h264_bitrate = 8 * sum(map(lambda t: t[1], self.h264_frame_stats))
            self.h264_stats_writer.writerow({"fps": h264_fps, "bitrate": h264_bitrate})

    def show_yuv_frame(self, window_name, yuv_frame):
        # the VideoFrame.info() dictionary contains some useful information
        # such as the video resolution
        info = yuv_frame.info()

        height, width = (  # noqa
            info["raw"]["frame"]["info"]["height"],
            info["raw"]["frame"]["info"]["width"],
        )
        # yuv_frame.vmeta()
        # yuv_frame.vmeta() returns a dictionary that contains additional
        # metadata from the drone (GPS coordinates, battery percentage, ...)

        # convert pdraw YUV flag to OpenCV YUV flag
        import cv2
        cv2_cvt_color_flag = {
            olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[yuv_frame.format()]
        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
        cv2.imshow("Frames via Olympe", cv2frame)
        cv2.waitKey(1)

    def fly(self):
        # Takeoff, fly, land, ...
        print("Takeoff if necessary...")
        self.drone(
            FlyingStateChanged(state="hovering", _policy="check")
            | FlyingStateChanged(state="flying", _policy="check")
            | (
                GPSFixStateChanged(fixed=1, _timeout=10, _policy="check_wait")
                >> (
                    TakeOff(_no_expect=True)
                    & FlyingStateChanged(
                        state="hovering", _timeout=10, _policy="check_wait"
                    )
                )
            )
        ).wait()
        maxtilt = self.drone.get_state(MaxTiltChanged)["max"]
        self.drone(MaxTilt(maxtilt)).wait()

        for i in range(4):
            print(f"Moving by ({i + 1}/4)...")
            self.drone(moveBy(10, 0, 0, math.pi, _timeout=20)).wait().success()

        print("Landing...")
        self.drone(Landing() >> FlyingStateChanged(state="landed", _timeout=5)).wait()
        print("Landed\n")

    def replay_with_vlc(self):
        # Replay this MP4 video file using VLC
        mp4_filepath = os.path.join(self.tempd, "streaming.mp4")
        subprocess.run(shlex.split(f"vlc --play-and-exit {mp4_filepath}"), check=True)


def test_streaming():
    streaming_example = StreamingExample()
    # Start the video stream
    streaming_example.start()
    # Perform some live video processing while the drone is flying
    # streaming_example.fly()
    time.sleep(20)
    # Stop the video stream
    streaming_example.stop()
    # Recorded video stream postprocessing
    streaming_example.replay_with_vlc()


if __name__ == "__main__":
    test_streaming()

I am using sphinx (Parrot-Sphinx simulator version 2.15.1) and olympe (7.0.2)
It returns :

Traceback (most recent call last):
  File "streaming2.py", line 214, in <module>
    test_streaming()
  File "streaming2.py", line 203, in test_streaming
    streaming_example.start()
  File "streaming2.py", line 66, in start
    self.drone.streaming.start()
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/video/pdraw.py", line 1135, in start
    return self.play(*args, **kwds)
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/video/pdraw.py", line 1219, in play
    return f.result_or_cancel(timeout=timeout)
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/utils/pomp_loop_thread.py", line 159, in result_or_cancel
    return self.result(timeout=timeout)
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 446, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError

Traceback (most recent call last):
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/cmd_itf.py", line 319, in _recv_cmd_cb
    res, message_args = message._decode_args(command)
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/messages.py", line 1212, in _decode_args
    decoded_args = list(
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/messages.py", line 1214, in <lambda>
    lambda t: cls.args_bitfield[t[0]](t[1])
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/enums.py", line 91, in __init__
    self._enums = list(map(self._enum_type_, self._bits_order(enums)))
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/aenum/__init__.py", line 2137, in __call__
    return cls.__new__(cls, value)
  File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/aenum/__init__.py", line 2502, in __new__
    raise ve_exc

I also tried this code both with sphinx and a real anafi ai drone :

import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging

olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})

class OlympeStreaming(threading.Thread):
    def __init__(self, drone):
        self.drone = drone
        self.frame_queue = queue.Queue()
        self.flush_queue_lock = threading.Lock()
        self.frame_num = 0 
        self.renderer = None
        self.yuv_frame = []
        super().__init__()
        super().start()


    def start(self):
        # Setup your callback functions to do some live video processing
        #self.drone.streaming.set_callbacks(
          #  raw_cb=self.yuv_frame_cb,
         #   h264_cb=self.h264_frame_cb,
         #   start_cb=self.start_cb,
        #    end_cb=self.end_cb,
        #    flush_raw_cb=self.flush_cb,
        #)
        # Start video streaming
        Pdraw = olympe.Pdraw()
        Pdraw.play(url="rtsp://10.202.0.1/live")
        self.drone.streaming.start()
        #self.renderer = PdrawRenderer(pdraw=self.drone.streaming)

    def stop(self):
        if self.renderer is not None:
            self.renderer.stop()
        # Properly stop the video stream and disconnect
        self.drone.streaming.stop()

    def yuv_frame_cb(self, yuv_frame):
        """
        This function will be called by Olympe for each decoded YUV frame.
            :type yuv_frame: olympe.VideoFrame
        """
        yuv_frame.ref()
        self.frame_queue.put_nowait(yuv_frame)

    def flush_cb(self, stream):
        if stream["vdef_format"] != olympe.VDEF_I420:
            return True
        with self.flush_queue_lock:
            while not self.frame_queue.empty():
                self.frame_queue.get_nowait().unref()
        return True

    def start_cb(self):
        pass

    def end_cb(self):
        pass

    def h264_frame_cb(self, h264_frame):
        pass

    def display_frame(self, yuv_frame):
        # the VideoFrame.info() dictionary contains some useful information
        # such as the video resolution
        info = yuv_frame.info()

        height, width = (  # noqa
            info["raw"]["frame"]["info"]["height"],
            info["raw"]["frame"]["info"]["width"],
        )

        # yuv_frame.vmeta() returns a dictionary that contains additional
        # metadata from the drone (GPS coordinates, battery percentage, ...)
        # convert pdraw YUV flag to OpenCV YUV flag
        cv2_cvt_color_flag = {
            olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
            olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
        }[yuv_frame.format()]

        # yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
        # i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame

        # Use OpenCV to convert the yuv frame to RGB
        cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
        img = cv2.imread('/home/unmanned/Pictures/Screenshot from 2025-01-23 15-27-19.png', cv2.IMREAD_GRAYSCALE)
        cv2.imshow("Frames via Olympe", img)
        cv2.waitKey(1)

    def run(self):
        # print('AHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH')
        main_thread = next(
            filter(lambda t: t.name == "MainThread", threading.enumerate())
        )
        c=0
        while main_thread.is_alive():
            with self.flush_queue_lock:
                while c<=200:
                    try:
                        # print('GET FRAMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE')
                        c=c+1
                        yuv_frame = self.frame_queue.get(timeout=0.01)
                    
                    except queue.Empty:
                        # print('EMPTYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY')
                        continue
                try:
                    # print('DISPLAYYYYYYYYYYYYYYYYYYYYYYYYYYYYY')
                    print(f'{len(yuv_frame)}')
                    # self.display_frame(yuv_frame)
                except Exception as e:
                    # print('EXCEPTIIOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOON')
                    print(e)
                finally:
                    # Don't forget to unref the yuv frame. We don't want to
                    # starve the video buffer pool
                    yuv_frame.unref()



logger = logging.getLogger(__name__)

if __name__ == "__main__":
        
    #eventually IP will be specified depending on what drone is chosen
    IP = "10.202.0.1"
    drone = olympe.Drone(IP)
    drone.connect()
    drone(TakeOff()).wait().success()
    
    
    streamer = OlympeStreaming(drone)
    streamer.start()
    # streamer.run()
    ### Flight commands here ###
    time.sleep(30)
    
    streamer.stop()
     
    drone(Landing()).wait().success()
    drone.disconnect()

But his one keeps returning :

local variable 'yuv_frame' referenced before assignment
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "custom_streaming.py", line 123, in run
    yuv_frame.unref()
UnboundLocalError: local variable 'yuv_frame' referenced before assignment

Olympe manages to connect to the drone.

Does anyone has any idea of what is going on ? What am I doing wrong ?
Note : I can subscribe to the drone stream (both with sphinx and real drone)

Your second error case is because you’re using yuv_frame as a local variable in run(). I feel like you’d like to use self.yuv_frame instead, or initialize it among the scope of your run() function if you want to keep it local

Regarding your first error case, your timeout comes from the fact the streaming is not started. The reason why seems not crystal clear for me, I’ve noticed you’ve changed few lines from the original example ; have you tried by plain copy-pasting it to see how it goes?
This is an old version of Olympe streaming that has been widely reworked since. But it was still tested since recently and we had not experienced such issues

Hello, thanks a lot, i finally managed to make it work using PDraw without olympe and it works fine !

1 Like

Glad to hear you’re not stuck anymore, streaming use will be easier with the next brand-new version of Olympe I hope :slight_smile:

This topic was automatically closed 3 days after the last reply. New replies are no longer allowed.