Description:
I’m working with an ANAFI Ai and trying to scan QR codes from the live video feed using OpenCV + Olympe. The same script worked on another drone
What I’ve tried:
- RTSP via OpenCV/FFMPEG:
Usingcv2.VideoCapture("rtsp://192.168.42.1:554/live", cv2.CAP_FFMPEG)
times out after ~30 seconds with:
VIDEOIO(FFMPEG): backend is generally available but can't be used to capture by name
Stream timeout triggered after 30000 ms
- RTSP via GStreamer (
gst-launch-1.0
):
Command:
gst-launch-1.0 -v rtspsrc location=rtsp://192.168.42.1:554/live protocols=tcp latency=200 ! \
rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! autovideosink
Fails with Could not receive message. (Timeout while waiting for server response)
.
- Olympe streaming (
drone.streaming.get_latest_frame()
):
After enabling media streaming with:
drone(VideoEnable(1)).wait()
drone(VideoEnableChanged(enabled=1)).wait(_timeout=5)
drone.streaming.start()
I get an AttributeError: 'Pdraw' object has no attribute 'is_running'
if I check .is_running
.
If I drop that and call .start()
directly, I don’t get errors, but I also don’t receive frames (window stays black / get_latest_frame()
returns None
).
Environment:
- Linux (Ubuntu with conda Python 3.12)
- Olympe installed from pip
- OpenCV 4.x with FFMPEG and GStreamer enabled
- Connected via Wi-Fi direct (legacy mode shows in logs)
Logs:
- RTSP: Timeout during SETUP stage (
Could not receive message
) - Olympe: Connection fine, telemetry received, but no video frames.
my code:
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, Landing, moveBy
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
from olympe.messages.ardrone3.PilotingSettings import MaxTilt
import time
import threading
import cv2
import keyboard
import os
import numpy as np
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
RTSP_URL = f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live"
drone_should_land = False
drone_should_hover = False
def keyboard_listener():
global drone_should_land, drone_should_hover
while True:
if keyboard.is_pressed("q"):
print("[KEYBOARD] Emergency land triggered (q pressed)!")
drone_should_land = True
break
elif keyboard.is_pressed("s"):
print("[KEYBOARD] Hover triggered (s pressed)!")
drone_should_hover = True
def safe_move(drone, dx, dy, dz, dpsi):
global drone_should_land, drone_should_hover
if drone_should_land:
print("[MOVE] Land requested. Skipping movement.")
return
if drone_should_hover:
print("[MOVE] Hover requested. Holding position...")
drone(FlyingStateChanged(state="hovering", _timeout=5)).wait()
while not drone_should_land:
time.sleep(0.5)
return
print(f"[MOVE] Executing moveBy: dx={dx}, dy={dy}, dz={dz}, dpsi={dpsi}")
move = drone(
moveBy(dx, dy, dz, dpsi)
>> FlyingStateChanged(state="hovering", _timeout=10)
).wait()
if not move.success():
print("[MOVE] Movement failed!")
def scan_qr_for_20_seconds(drone):
print("[QR] Connecting to drone video stream...")
cap = cv2.VideoCapture(RTSP_URL, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
if not cap.isOpened():
print("[QR] Failed to open video stream")
return []
detector = cv2.QRCodeDetector()
qr_found = []
start_time = time.time()
while time.time() - start_time < 20:
ret, frame = cap.read()
if not ret:
continue
data, bbox, _ = detector.detectAndDecode(frame)
if bbox is not None and len(bbox) > 0:
bbox = np.int32(bbox)
for i in range(len(bbox[0])):
pt1 = tuple(bbox[0][i])
pt2 = tuple(bbox[0][(i + 1) % len(bbox[0])])
cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
if data:
cv2.putText(frame, data, (int(bbox[0][0][0]), int(bbox[0][0][1]) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
if data and data not in [q[0] for q in qr_found]:
print(f"[QR] QR Code detected: {data}")
qr_found.append((data, time.time() - start_time))
break
# ✅ Show live drone video feed with QR overlay
cv2.imshow("Drone QR Debug View", frame)
if cv2.waitKey(1) & 0xFF == ord("q") or drone_should_land:
break
cap.release()
cv2.destroyAllWindows()
return qr_found
def main():
global drone_should_land, drone_should_hover
print("[MAIN] Connecting to drone...")
drone = olympe.Drone(DRONE_IP)
drone.connect()
print("[MAIN] Starting keyboard listener...")
listener_thread = threading.Thread(target=keyboard_listener, daemon=True)
listener_thread.start()
# Step 5: Scan for QR
print("[MAIN] Scanning for QR code...")
qr_found = scan_qr_for_20_seconds(drone)
if qr_found:
print(f"[MAIN] QR code(s) detected: {qr_found}")
else:
print("[MAIN] No QR code found within time limit.")
print("[MAIN] Drone landed.")
drone.disconnect()
if __name__ == "__main__":
main()