Hello there, I am struggling to use the streaming.py example from the Olympe documentation (olympe/src/olympe/doc/examples/streaming.py at master · Parrot-Developers/olympe · GitHub)
Here is the code I use :
import csv
import math
import os
import queue
import shlex
import subprocess
import tempfile
import threading
import time
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, Landing
from olympe.messages.ardrone3.Piloting import moveBy
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
from olympe.messages.ardrone3.PilotingSettings import MaxTilt
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
from olympe.video.renderer import PdrawRenderer
olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})
DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT")
class StreamingExample:
def __init__(self):
# Create the olympe.Drone object from its IP address
self.drone = olympe.Drone(DRONE_IP)
self.tempd = tempfile.mkdtemp(prefix="olympe_streaming_test_")
print(f"Olympe streaming example output dir: {self.tempd}")
self.h264_frame_stats = []
self.h264_stats_file = open(os.path.join(self.tempd, "h264_stats.csv"), "w+")
self.h264_stats_writer = csv.DictWriter(
self.h264_stats_file, ["fps", "bitrate"]
)
self.h264_stats_writer.writeheader()
self.frame_queue = queue.Queue()
self.processing_thread = threading.Thread(target=self.yuv_frame_processing)
self.renderer = None
def start(self):
# Connect to drone
assert self.drone.connect(retry=3)
if DRONE_RTSP_PORT is not None:
self.drone.streaming.server_addr = f"{DRONE_IP}:{DRONE_RTSP_PORT}"
# You can record the video stream from the drone if you plan to do some
# post processing.
# self.drone.streaming.set_output_files(
# video=os.path.join(self.tempd, "streaming.mp4"),
# metadata=os.path.join(self.tempd, "streaming_metadata.json"),
# )
# Setup your callback functions to do some live video processing
self.drone.streaming.set_callbacks(
raw_cb=self.yuv_frame_cb,
h264_cb=self.h264_frame_cb,
start_cb=self.start_cb,
end_cb=self.end_cb,
flush_raw_cb=self.flush_cb,
)
# Start video streaming
self.drone.streaming.start()
# self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
self.running = True
self.processing_thread.start()
def stop(self):
self.running = False
self.processing_thread.join()
if self.renderer is not None:
self.renderer.stop()
# Properly stop the video stream and disconnect
assert self.drone.streaming.stop()
assert self.drone.disconnect()
self.h264_stats_file.close()
def yuv_frame_cb(self, yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
yuv_frame.ref()
self.frame_queue.put_nowait(yuv_frame)
def yuv_frame_processing(self):
while self.running:
try:
yuv_frame = self.frame_queue.get(timeout=0.1)
except queue.Empty:
continue
# You should process your frames here and release (unref) them when you're done.
# Don't hold a reference on your frames for too long to avoid memory leaks and/or memory
# pool exhaustion.
yuv_frame.unref()
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start_cb(self):
pass
def end_cb(self):
pass
def h264_frame_cb(self, h264_frame):
"""
This function will be called by Olympe for each new h264 frame.
:type yuv_frame: olympe.VideoFrame
"""
# Get a ctypes pointer and size for this h264 frame
frame_pointer, frame_size = h264_frame.as_ctypes_pointer()
# For this example we will just compute some basic video stream stats
# (bitrate and FPS) but we could choose to resend it over an another
# interface or to decode it with our preferred hardware decoder..
# Compute some stats and dump them in a csv file
info = h264_frame.info()
print(f" H264")
frame_ts = info["ntp_raw_timestamp"]
if not bool(info["is_sync"]):
while len(self.h264_frame_stats) > 0:
start_ts, _ = self.h264_frame_stats[0]
if (start_ts + 1e6) < frame_ts:
self.h264_frame_stats.pop(0)
else:
break
self.h264_frame_stats.append((frame_ts, frame_size))
h264_fps = len(self.h264_frame_stats)
h264_bitrate = 8 * sum(map(lambda t: t[1], self.h264_frame_stats))
self.h264_stats_writer.writerow({"fps": h264_fps, "bitrate": h264_bitrate})
def show_yuv_frame(self, window_name, yuv_frame):
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta()
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
import cv2
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
cv2.imshow("Frames via Olympe", cv2frame)
cv2.waitKey(1)
def fly(self):
# Takeoff, fly, land, ...
print("Takeoff if necessary...")
self.drone(
FlyingStateChanged(state="hovering", _policy="check")
| FlyingStateChanged(state="flying", _policy="check")
| (
GPSFixStateChanged(fixed=1, _timeout=10, _policy="check_wait")
>> (
TakeOff(_no_expect=True)
& FlyingStateChanged(
state="hovering", _timeout=10, _policy="check_wait"
)
)
)
).wait()
maxtilt = self.drone.get_state(MaxTiltChanged)["max"]
self.drone(MaxTilt(maxtilt)).wait()
for i in range(4):
print(f"Moving by ({i + 1}/4)...")
self.drone(moveBy(10, 0, 0, math.pi, _timeout=20)).wait().success()
print("Landing...")
self.drone(Landing() >> FlyingStateChanged(state="landed", _timeout=5)).wait()
print("Landed\n")
def replay_with_vlc(self):
# Replay this MP4 video file using VLC
mp4_filepath = os.path.join(self.tempd, "streaming.mp4")
subprocess.run(shlex.split(f"vlc --play-and-exit {mp4_filepath}"), check=True)
def test_streaming():
streaming_example = StreamingExample()
# Start the video stream
streaming_example.start()
# Perform some live video processing while the drone is flying
# streaming_example.fly()
time.sleep(20)
# Stop the video stream
streaming_example.stop()
# Recorded video stream postprocessing
streaming_example.replay_with_vlc()
if __name__ == "__main__":
test_streaming()
I am using sphinx (Parrot-Sphinx simulator version 2.15.1) and olympe (7.0.2)
It returns :
Traceback (most recent call last):
File "streaming2.py", line 214, in <module>
test_streaming()
File "streaming2.py", line 203, in test_streaming
streaming_example.start()
File "streaming2.py", line 66, in start
self.drone.streaming.start()
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/video/pdraw.py", line 1135, in start
return self.play(*args, **kwds)
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/video/pdraw.py", line 1219, in play
return f.result_or_cancel(timeout=timeout)
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/utils/pomp_loop_thread.py", line 159, in result_or_cancel
return self.result(timeout=timeout)
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 446, in result
raise TimeoutError()
concurrent.futures._base.TimeoutError
Traceback (most recent call last):
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/cmd_itf.py", line 319, in _recv_cmd_cb
res, message_args = message._decode_args(command)
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/messages.py", line 1212, in _decode_args
decoded_args = list(
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/messages.py", line 1214, in <lambda>
lambda t: cls.args_bitfield[t[0]](t[1])
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/olympe/arsdkng/enums.py", line 91, in __init__
self._enums = list(map(self._enum_type_, self._bits_order(enums)))
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/aenum/__init__.py", line 2137, in __call__
return cls.__new__(cls, value)
File "/home/Parrot/olympe-venv/lib/python3.8/site-packages/aenum/__init__.py", line 2502, in __new__
raise ve_exc
I also tried this code both with sphinx and a real anafi ai drone :
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing, moveTo, NavigateHome
import threading
import time
import queue
import cv2
import logging
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
class OlympeStreaming(threading.Thread):
def __init__(self, drone):
self.drone = drone
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
self.frame_num = 0
self.renderer = None
self.yuv_frame = []
super().__init__()
super().start()
def start(self):
# Setup your callback functions to do some live video processing
#self.drone.streaming.set_callbacks(
# raw_cb=self.yuv_frame_cb,
# h264_cb=self.h264_frame_cb,
# start_cb=self.start_cb,
# end_cb=self.end_cb,
# flush_raw_cb=self.flush_cb,
#)
# Start video streaming
Pdraw = olympe.Pdraw()
Pdraw.play(url="rtsp://10.202.0.1/live")
self.drone.streaming.start()
#self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
def stop(self):
if self.renderer is not None:
self.renderer.stop()
# Properly stop the video stream and disconnect
self.drone.streaming.stop()
def yuv_frame_cb(self, yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
yuv_frame.ref()
self.frame_queue.put_nowait(yuv_frame)
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with self.flush_queue_lock:
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start_cb(self):
pass
def end_cb(self):
pass
def h264_frame_cb(self, h264_frame):
pass
def display_frame(self, yuv_frame):
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
# yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
# i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame
# Use OpenCV to convert the yuv frame to RGB
cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
img = cv2.imread('/home/unmanned/Pictures/Screenshot from 2025-01-23 15-27-19.png', cv2.IMREAD_GRAYSCALE)
cv2.imshow("Frames via Olympe", img)
cv2.waitKey(1)
def run(self):
# print('AHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH')
main_thread = next(
filter(lambda t: t.name == "MainThread", threading.enumerate())
)
c=0
while main_thread.is_alive():
with self.flush_queue_lock:
while c<=200:
try:
# print('GET FRAMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE')
c=c+1
yuv_frame = self.frame_queue.get(timeout=0.01)
except queue.Empty:
# print('EMPTYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY')
continue
try:
# print('DISPLAYYYYYYYYYYYYYYYYYYYYYYYYYYYYY')
print(f'{len(yuv_frame)}')
# self.display_frame(yuv_frame)
except Exception as e:
# print('EXCEPTIIOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOON')
print(e)
finally:
# Don't forget to unref the yuv frame. We don't want to
# starve the video buffer pool
yuv_frame.unref()
logger = logging.getLogger(__name__)
if __name__ == "__main__":
#eventually IP will be specified depending on what drone is chosen
IP = "10.202.0.1"
drone = olympe.Drone(IP)
drone.connect()
drone(TakeOff()).wait().success()
streamer = OlympeStreaming(drone)
streamer.start()
# streamer.run()
### Flight commands here ###
time.sleep(30)
streamer.stop()
drone(Landing()).wait().success()
drone.disconnect()
But his one keeps returning :
local variable 'yuv_frame' referenced before assignment
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "custom_streaming.py", line 123, in run
yuv_frame.unref()
UnboundLocalError: local variable 'yuv_frame' referenced before assignment
Olympe manages to connect to the drone.
Does anyone has any idea of what is going on ? What am I doing wrong ?
Note : I can subscribe to the drone stream (both with sphinx and real drone)