Accessing ReST API and RTSP stream through Skycontroller

Hi,

I currently have a working system for capturing the RTSP stream of the Anafi AI, and accessing the ReST API to capture media when connected directly to the drone via WiFi. I can also access the stream via the micro-HDMI port of the Skycontroller.

However when I try to use my Olympe-PDrAW script to access the stream using the 192.168.53.1 address of the skycontroller I receive a “Connection reset by peer” error message.

I also receive this message when trying to access the drones’ ReST API through the skycontroller with 192.168.53.1:180. Which is Parrots suggested method in previous posts.

Are you able to confirm whether:

  1. The user should be able to access the RTSP stream with the same code through the skycontroller IP.
  2. It is possible to access the drones ReST API via the skycontroller.

A separate but related issue, I see there is a Cellular Feature included in Olympe 7.7, is it possible to access the drone over the internet with a public facing IP? As I cannot find any comprehensive documentation on how to use this.

1 Like

I am looking for a solution for the same problem as you @Dan1 . I want to know if it’s possible to access the drone over the internet with a public facing IP?

Please help.

Hi,

Could you please test the following streaming example using the olympe.SkyController class with Olympe 7.7.1 ?

import csv
import os
import queue
import tempfile
import threading
import time

import olympe
from olympe.video.renderer import PdrawRenderer


olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})

SKYCTRL_IP = os.environ.get("SKYCTRL_IP", "192.168.53.1")


class StreamingExample:
    def __init__(self):
        # Create the olympe.Drone object from its IP address
        self.skyctrl = olympe.SkyController(SKYCTRL_IP)
        self.tempd = tempfile.mkdtemp(prefix="olympe_streaming_test_")
        print(f"Olympe streaming example output dir: {self.tempd}")
        self.h264_frame_stats = []
        self.h264_stats_file = open(os.path.join(self.tempd, "h264_stats.csv"), "w+")
        self.h264_stats_writer = csv.DictWriter(
            self.h264_stats_file, ["fps", "bitrate"]
        )
        self.h264_stats_writer.writeheader()
        self.frame_queue = queue.Queue()
        self.processing_thread = threading.Thread(target=self.yuv_frame_processing)
        self.renderer = None

    def start(self):
        # Connect to drone
        assert self.skyctrl.connect(retry=3)

        # You can record the video stream from the drone if you plan to do some
        # post processing.
        self.skyctrl.streaming.set_output_files(
            video=os.path.join(self.tempd, "streaming.mp4"),
            metadata=os.path.join(self.tempd, "streaming_metadata.json"),
        )

        # Setup your callback functions to do some live video processing
        self.skyctrl.streaming.set_callbacks(
            raw_cb=self.yuv_frame_cb,
            h264_cb=self.h264_frame_cb,
            start_cb=self.start_cb,
            end_cb=self.end_cb,
            flush_raw_cb=self.flush_cb,
        )
        # Start video streaming
        self.skyctrl.streaming.start()
        self.renderer = PdrawRenderer(pdraw=self.skyctrl.streaming)
        self.running = True
        self.processing_thread.start()

    def stop(self):
        self.running = False
        self.processing_thread.join()
        if self.renderer is not None:
            self.renderer.stop()
        # Properly stop the video stream and disconnect
        assert self.skyctrl.streaming.stop()
        assert self.skyctrl.disconnect()
        self.h264_stats_file.close()

    def yuv_frame_cb(self, yuv_frame):
        """
        This function will be called by Olympe for each decoded YUV frame.

            :type yuv_frame: olympe.VideoFrame
        """
        yuv_frame.ref()
        self.frame_queue.put_nowait(yuv_frame)

    def yuv_frame_processing(self):
        while self.running:
            try:
                yuv_frame = self.frame_queue.get(timeout=0.1)
            except queue.Empty:
                continue
            # You should process your frames here and release (unref) them when you're done.
            # Don't hold a reference on your frames for too long to avoid memory leaks and/or memory
            # pool exhaustion.
            yuv_frame.unref()

    def flush_cb(self, stream):
        if stream["vdef_format"] != olympe.VDEF_I420:
            return True
        while not self.frame_queue.empty():
            self.frame_queue.get_nowait().unref()
        return True

    def start_cb(self):
        pass

    def end_cb(self):
        pass

    def h264_frame_cb(self, h264_frame):
        """
        This function will be called by Olympe for each new h264 frame.

            :type yuv_frame: olympe.VideoFrame
        """

        # Get a ctypes pointer and size for this h264 frame
        frame_pointer, frame_size = h264_frame.as_ctypes_pointer()

        # For this example we will just compute some basic video stream stats
        # (bitrate and FPS) but we could choose to resend it over an another
        # interface or to decode it with our preferred hardware decoder..

        # Compute some stats and dump them in a csv file
        info = h264_frame.info()
        frame_ts = info["ntp_raw_timestamp"]
        if not bool(info["is_sync"]):
            while len(self.h264_frame_stats) > 0:
                start_ts, _ = self.h264_frame_stats[0]
                if (start_ts + 1e6) < frame_ts:
                    self.h264_frame_stats.pop(0)
                else:
                    break
            self.h264_frame_stats.append((frame_ts, frame_size))
            h264_fps = len(self.h264_frame_stats)
            h264_bitrate = 8 * sum(map(lambda t: t[1], self.h264_frame_stats))
            self.h264_stats_writer.writerow({"fps": h264_fps, "bitrate": h264_bitrate})


    def fly(self):
        # ...
        time.sleep(10)
        # ...

def test_streaming():
    streaming_example = StreamingExample()
    # Start the video stream
    streaming_example.start()
    # Perform some live video processing while the drone is flying
    streaming_example.fly()
    # Stop the video stream
    streaming_example.stop()


if __name__ == "__main__":
    test_streaming()

Yes, you should be able the get the video stream from your Anafi Ai through the SkyController 4. The script above should demonstrate that once your SkyController is connected to a Drone, whether it is through its wifi or cellular link. This example is using the olympe.SkyController.streaming API (using the olympe.Drone.streaming API only works for the original Anafi drone with a SkyController 3).

For a reference on how to pair a SkyCtontroller 4 with an Anafi Ai, see the Olympe user guide for the SkyCtrl / Drone WiFi pairing procedure and the SkyCtrl / Drone cellular pairing procedure.

Yes, it’s possible. Likewise, you should be able to use the olympe.SkyController.media API to access the media ReST API. Could you please test the following media API example ?

import olympe
import olympe.log
import os
import subprocess
import tempfile
from olympe.media import (
    media_created,
    resource_created,
    media_removed,
    resource_removed,
    resource_downloaded,
    indexing_state,
    delete_media,
    download_media,
    download_media_thumbnail,
    MediaEvent,
)
from olympe.messages.camera2 import Command, Config, Event
from logging import getLogger

olympe.log.update_config(
    {
        "loggers": {
            "olympe": {"level": "INFO", "handlers": ["console"]},
            "ulog": {"level": "INFO", "handlers": ["console"]},
            __name__: {"level": "DEBUG", "handlers": ["console"]},
        }
    }
)

logger = getLogger(__name__)

SKYCTRL_IP = os.environ.get("SKYCTRL_IP", "192.168.53.1")


class MediaEventListener(olympe.EventListener):
    def __init__(self, media):
        super().__init__(media, timeout=60)
        self._media = media
        self._media_id = []
        self._downloaded_resources = []
        self.remote_resource_count = 0
        self.local_resource_count = 0

    @olympe.listen_event(media_created())
    def onMediaCreated(self, event, scheduler):
        self._media_id.append(event.media_id)
        logger.info(f"media_created {event.media_id}")
        # When using the photo burst mode, the `media_created` event is sent by the
        # drone when the first photo resource is available for download.  The
        # `media_created` event does not include a full listing of all the future
        # resources of this media. The `resource_created` event will be sent
        # by the drone for the remaining resources.
        # However, the "download_media" and "download_media_thumbnail" will do
        # the right thing and download for you any subsequent resources associated
        # to this media id automatically.
        self._media(
            download_media_thumbnail(event.media_id) & download_media(event.media_id)
        )

    @olympe.listen_event(resource_created())
    def onResourceCreated(self, event, scheduler):
        logger.info(f"resource_created {event.resource_id}")

    @olympe.listen_event(media_removed())
    def onMediaRemoved(self, event, scheduler):
        logger.info(f"media_removed {event.media_id}")

    @olympe.listen_event(resource_removed())
    def onResourceRemoved(self, event, scheduler):
        logger.info(f"resource_removed {event.resource_id}")

    @olympe.listen_event(resource_downloaded())
    def onResourceDownloaded(self, event, scheduler):
        if event.is_thumbnail:
            return
        logger.info(
            "resource_downloaded {} {}".format(
                event.resource_id,
                event.data["download_path"],
            )
        )
        self._downloaded_resources.append(
            self._media.resource_info(resource_id=event.resource_id)
        )

    @olympe.listen_event()
    def default(self, event, scheduler):
        if isinstance(event, MediaEvent):
            logger.info(event)

    def unsubscribe(self):
        self._media.wait_for_pending_downloads()
        # Sanity check 1/2: md5 checksum
        # The integrity check has already been performed by Olympe
        # For this example the following step demonstrate how to perform the media
        # integrity check afterward using the "md5summ --check *.md5" command.
        for resource in self._downloaded_resources:
            check = subprocess.run(
                ["md5sum", "--check", resource.download_md5_path],
                stdout=subprocess.PIPE,
                cwd=os.path.dirname(resource.download_path),
            )
            stdout = check.stdout.decode().strip()
            if check.returncode == 0:
                logger.info("Integrity check: " + stdout)
            else:
                logger.error("Integrity check: " + stdout)
                super().unsubscribe()
                return

        # Sanity check 2/2: local downloaded resources equals the number of remote
        # resources
        self.remote_resource_count = sum(
            map(
                lambda id_: len(self._media.resource_info(media_id=id_)), self._media_id
            )
        )
        self.local_resource_count = len(self._downloaded_resources)
        if self.local_resource_count != self.remote_resource_count:
            logger.error(
                "Downloaded {} resources instead of {}".format(
                    self.local_resource_count,
                    self.remote_resource_count,
                )
            )
            super().unsubscribe()
            return

        # OK then, we can now safely delete the remote media
        for media_id in self._media_id:
            delete = delete_media(media_id, _timeout=10)
            if not self._media(delete).wait().success():
                logger.error(f"Failed to delete media {media_id} {delete.explain()}")
        super().unsubscribe()


def setup_photo_burst_mode(drone):
    # For the file_format: jpeg is the only available option
    # dng is not supported in burst mode
    drone(
        Command.Configure(
            camera_id=0,
            config=Config(
                camera_mode="photo",
                photo_mode="burst",
                photo_file_format="jpeg",
                photo_burst_value="14_over_1s",
                photo_dynamic_range="standard",
                photo_resolution="12_mega_pixels",
            ),
            _timeout=3.0,
        )
    ).wait()


def test_media():
    # By default, the SkyController class instantiate an internal olympe.Media object
    # (media_autoconnect=True by default). This olympe.Media object is exposed throught
    # the SkyController.media property. In this case the connection to the remote media
    # API endpoint is automatically handled by the olympe.SkyController controller
    # class.
    with olympe.SkyController(SKYCTRL_IP) as skyctrl:
        assert skyctrl.connect(retry=5, timeout=60)
        setup_photo_burst_mode(skyctrl)
        skyctrl.media.download_dir = tempfile.mkdtemp(
            prefix="olympe_skyctrl_media_example_"
        )
        skyctrl.media.integrity_check = True
        logger.info("waiting for media resources indexing...")
        if (
            not skyctrl.media(indexing_state(state="indexed"))
            .wait(_timeout=60)
            .success()
        ):
            logger.error("Media indexing timed out")
            return
        logger.info("media resources indexed")
        with MediaEventListener(skyctrl.media) as media_listener:
            media_state = skyctrl(media_created(_timeout=3.0))
            photo_capture = skyctrl(
                Event.Photo(
                    type="stop",
                    stop_reason="capture_done",
                    _timeout=3.0,
                    _policy="wait",
                )
                & Command.StartPhoto(camera_id=0)
            ).wait()
            assert photo_capture, photo_capture.explain()
            media_state.wait()
            assert media_state, media_state.explain()
        assert media_listener.remote_resource_count > 0, "remote resource count == 0"
        assert (
            media_listener.remote_resource_count == media_listener.local_resource_count
        ), "remote resource count = {} != {}".format(
            media_listener.remote_resource_count, media_listener.local_resource_count
        )
        assert skyctrl.disconnect()


if __name__ == "__main__":
    test_media()

This information was valid for the SkyController 3 and the original Anafi but not for Anafi Ai and the SkyController 4.

Yes, it’s possible to connect Olympe to an Anafi Ai using the drone cellular link.
Have you found the Cellular (4G) pairing of a SkyController and a drone user guide section ?

For this to work, the PC running Olympe should have access to the internet and be connected to a SkyController 4. The initial SkyController 4 / Anafi Ai cellular pairing requires the SkyCtrl to be connected to the drone over WiFi first.

The cellular connectivity relies on an authentication mechanism provided by the SkyController 4 hardware secure element. It’s not currently possible to establish a cellular connection to the drone without a SkyController 4.

I hope this answer all of your questions. In any case, please let me know if you need any further information. Thanks

Thanks @ndessart, that helps. Will let you know if I have any follow up questions. You mention it’s not currently possible to establish a cellular connection to the drone without a SkyController 4, is the ability to connect to the drone directly over 4G scheduled for the future?

Thanks,
Dan.

This topic was automatically closed 3 days after the last reply. New replies are no longer allowed.