Hi all,
I wanted to follow up on my recent post regarding indoor navigation with the ANAFI Ai. I’ve now implemented a basic obstacle avoidance system using the front-facing camera and OpenCV. The logic is simple:
- The video feed is split into left, center, and right zones.
- I apply Canny edge detection and contour analysis to detect large objects in each zone.
- If an obstacle is detected in the center zone, the drone checks the left and right zones:
- If the left is clear, it sidesteps left using
moveBy()
.
- If the right is clear, it sidesteps right.
- If both sides are blocked, it ascends slightly to avoid the obstacle vertically.
- If no obstacle is detected, the drone moves forward.
This was tested indoors in a static environment. I know this is a very lightweight approach compared to full SLAM or depth-mapping, but I wanted to start simple.
Is this a valid foundation for indoor obstacle avoidance with ANAFI Ai?
Would anyone recommend a better or more robust approach, especially in environments with dynamic or more complex layouts?
Appreciate any feedback or suggestions!

import argparse
import olympe
import os
import re
import sys
import cv2
import time
from olympe.video.pdraw import Pdraw, PdrawState
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
# Simple edge-detection based obstacle detection by region
def detect_obstacle_regions(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blur, 50, 150)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
height, width = frame.shape[:2]
left_zone = (0, int(width * 0.3))
center_zone = (int(width * 0.35), int(width * 0.65))
right_zone = (int(width * 0.7), width)
obstacle_left = obstacle_center = obstacle_right = False
for cnt in contours:
area = cv2.contourArea(cnt)
if area > 1500: # Only large objects are considered obstacles
x, y, w, h = cv2.boundingRect(cnt)
cx = x + w // 2
if left_zone[0] <= cx < left_zone[1]:
obstacle_left = True
elif center_zone[0] <= cx < center_zone[1]:
obstacle_center = True
elif right_zone[0] <= cx < right_zone[1]:
obstacle_right = True
return obstacle_left, obstacle_center, obstacle_right
def main(argv):
parser = argparse.ArgumentParser(description="Drone Obstacle Avoidance Test")
parser.add_argument(
"-u", "--url",
default=f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live",
help="RTSP URL for drone video stream"
)
parser.add_argument("-m", "--media-name", default="Front camera")
args = parser.parse_args(argv)
# Extract drone IP
drone_ip = re.search(r"\d+\.\d+\.\d+\.\d+", args.url)
if not drone_ip:
print("Invalid drone IP")
return
# Connect to drone
drone = olympe.Drone(drone_ip.group())
if not drone.connect():
print("Failed to connect to drone")
return
# Start video stream
pdraw = Pdraw()
pdraw.play(url=args.url, media_name=args.media_name)
if not pdraw.wait(PdrawState.Playing, timeout=5):
print("Failed to start video stream")
drone.disconnect()
pdraw.destroy()
return
cap = cv2.VideoCapture(args.url, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
try:
# Takeoff and hover
print("Taking off...")
if not drone(TakeOff() >> FlyingStateChanged(state="hovering", _timeout=10)).wait().success():
print("Takeoff failed")
return
print("Drone hovering. Obstacle detection running...")
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame")
continue
obs_left, obs_center, obs_right = detect_obstacle_regions(frame)
if obs_center:
print("Obstacle ahead!")
if not obs_left:
print("Turning left → moveBy(0, -0.3, 0, 0)")
drone(moveBy(0, -0.3, 0, 0) >> FlyingStateChanged(state="hovering", _timeout=5)).wait()
elif not obs_right:
print("Turning right → moveBy(0, 0.3, 0, 0)")
drone(moveBy(0, 0.3, 0, 0) >> FlyingStateChanged(state="hovering", _timeout=5)).wait()
else:
print("Surrounded → move up → moveBy(0, 0, -0.3, 0)")
drone(moveBy(0, 0, -0.3, 0) >> FlyingStateChanged(state="hovering", _timeout=5)).wait()
else:
print("No obstacle → move forward → moveBy(0.3, 0, 0, 0)")
drone(moveBy(0.3, 0, 0, 0) >> FlyingStateChanged(state="hovering", _timeout=5)).wait()
# Visualize frame
cv2.imshow("Drone Feed", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
print("Landing...")
drone(Landing() >> FlyingStateChanged(state="landed", _timeout=10)).wait()
cap.release()
cv2.destroyAllWindows()
pdraw.close()
drone.disconnect()
pdraw.wait(PdrawState.Closed, timeout=5)
pdraw.destroy()
print("Cleanup complete.")
if __name__ == "__main__":
main(sys.argv[1:])