PointNFly anafi AI

Hello there,
I am struggling with the PointNfly feature of olympe using an anafi ai under sphinx without GUI (I have my own):

expectation_point = self.drone(
        Execute(point=dict(Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude)))
        >> State(active=dict(point=dict(Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude))))
        >> FlyingStateChanged(state="flying")
    ).wait(_timeout=10)
    if not expectation_point.success():
        print("PointNfFly The drone has failed executing the 'Point' directive")

        expectation_pointnfly_above_alt = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_ABOVE_MAX_ALTITUDE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_above_alt.success():
            print("PointNFly failed because above alt")
            return
        
        expectation_pointnfly_not_flying = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_NOT_FLYING"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_not_flying.success():
            print("PointNFly is not possible when the drone is not flying")
            return
        
        expectation_pointnfly_not_calib = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_NOT_CALIBRATED"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_not_calib.success():
            print("PointNFly failed because not calib")
            return
        
        expectation_pointnfly_closetogroud = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_TOO_CLOSE_TO_GROUND"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_closetogroud.success():
            print("PointNFly failed because too close to ground")
            return
        
        expectation_pointnfly_no_batt = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_INSUFFICIENT_BATTERY"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_no_batt.success():
            print("PointNFly failed because no battery")
            return
        
        expectation_pointnfly_no_gps = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_GPS_INFO_INACCURATE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_no_gps.success():
            print("PointNFly failed because no gps")
            return
        
        expectation_pointnfly_out_geo = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_OUT_GEOFENCE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_out_geo.success():
            print("PointNFly failed because out geofence")
            return
        return
    print("PointNFly 'Point' directive is executing! The drone is pointing to it")

I tried to get the info why the Point was failing but nothing it just returns :

PointNfFly The drone has failed executing the 'Point' directive

Do you have any idea why it doesn’t work for me ?
Thanks !

Hello Jatou,

That may be induced by the state of your drone when sending this command. The drone should be flying in a ‘hovering’ state for instance. In what state is your drone when you’re requesting this Point directive?

More over, why did you add additional dict in your Execute command?
Have you tried :

expectation_point = self.drone(
    Execute(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude))
    >> State(active=dict(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude)))
    >> FlyingStateChanged(state="flying")
).wait(_timeout=10)

Hello,
When calling a point directive my drone is in hovering state.
This is my verification code :

expectation_point = self.drone(
    Execute(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude))
    >> State(active=dict(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude)))
    >> FlyingStateChanged(state="flying")
    ).wait(_timeout=10)
execution = self.drone(expectation_point).wait(30)
if not execution.success():
        print("PointNfFly The drone has failed executing the 'Point' directive")
        expectation_point.explain()
        print(f" EXPLAIN : {expectation_point.explain()}")
        # print(f"PointNFly unavailibility reasons {self.drone(State(unavailable=dict(reasons=[])))}")
        # unavailable_state = 
        # reasons = unavailable_state # .unavailable.reasons
        i=0
        for reasons in self.drone(State(unavailable=dict())):
            print(f"PointNFly unavailibility type : {type(reasons)}, reasons : {vars(reasons)}, {i}")
            i=i+1
            # if i==1:
            #    print(f"PointNFly unavailibility reasons {reasons.Unavailable}")
        expectation_pointnfly_above_alt = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_ABOVE_MAX_ALTITUDE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_above_alt.success():
            print("PointNFly failed because above alt")
            return
        
        expectation_pointnfly_not_flying = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_NOT_FLYING"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_not_flying.success():
            print("PointNFly is not possible when the drone is not flying")
            return
        
        expectation_pointnfly_not_calib = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_NOT_CALIBRATED"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_not_calib.success():
            print("PointNFly failed because not calib")
            return
        
        expectation_pointnfly_closetogroud = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_TOO_CLOSE_TO_GROUND"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_closetogroud.success():
            print("PointNFly failed because too close to ground")
            return
        
        expectation_pointnfly_no_batt = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_INSUFFICIENT_BATTERY"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_no_batt.success():
            print("PointNFly failed because no battery")
            return
        
        expectation_pointnfly_no_gps = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_GPS_INFO_INACCURATE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_no_gps.success():
            print("PointNFly failed because no gps")
            return
        
        expectation_pointnfly_out_geo = self.drone(
        State(unavailable=dict(reasons=["UNAVAILABILITY_REASON_DRONE_OUT_GEOFENCE"]))
        ).wait(_timeout=10)
        if expectation_pointnfly_out_geo.success():
            print("PointNFly failed because out geofence")
            return
        print("PointNFly unavailable for none of the unavailibility reasons")
        return
print("PointNFly 'Point' directive is executing! The drone is pointing to it")

And this is the error when publishing this ros message :
rostopic pub /OlympeBridge/PointNFly/Point olympe_bridge/Poi “{latitude: XX, longitude: XX, altitude: 10.0, poi_mode: 0}” :

PointNfFly The drone has failed executing the 'Point' directive
        pointnfly.Event.State(
            active=pointnfly.State.Active(
                point=dict(gimbal_control_mode=0, latitude=XX, longitude=XX, altitude=10.0)
PointNFly unavailibility type : <class 'olympe.arsdkng.expectations.ArsdkCheckStateExpectation'>, reasons : {'_future': <Future at 0x7f2af43792e0 state=cancelled>, '_awaited': True, '_scheduler': <olympe.media.MediaScheduler object at 0x7f2af71bcd30>, '_success': False, '_timeout': None, '_deadline': None, '_timedout': False, '_scheduled_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f2af42636c0>, 0)>, '_asyncio_future': None, '_float_tol': (1e-07, 1e-09), 'expected_message': <olympe.arsdkng.messages.pointnfly.Event.State object at 0x7f2af4263fa0>, 'expected_event_type': <class 'olympe.arsdkng.events.ArsdkProtoMessageEvent'>, 'expected_args': OrderedDict([('unavailable', {})]), 'matched_state': None}, 0
PointNFly unavailibility type : <class 'olympe.arsdkng.expectations.ArsdkEventExpectation'>, reasons : {'_future': <Future at 0x7f2af4433f40 state=pending>, '_awaited': True, '_scheduler': <olympe.media.MediaScheduler object at 0x7f2af71bcd30>, '_success': False, '_timeout': None, '_deadline': None, '_timedout': False, '_scheduled_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f2af4433270>, 0)>, '_asyncio_future': None, '_float_tol': (1e-07, 1e-09), 'expected_message': <olympe.arsdkng.messages.pointnfly.Event.State object at 0x7f2af44336d0>, 'expected_args': OrderedDict([('unavailable', {})]), 'expected_event_type': <class 'olympe.arsdkng.events.ArsdkProtoMessageEvent'>, 'received_args': [], '_received_events': [], 'matched_args': OrderedDict()}, 1
PointNFly unavailable for none of the unavailibility reasons

I can’t seem to do it right. Any help would be great !

Hey Jatou,

First, there’s no need to redefine a second expectation to wait for :slight_smile:
You could simply do it this way :

expectation_point = self.drone(
    Execute(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude))
    >> State(active=dict(point=Point(gimbal_control_mode=msg.poi_mode, latitude=msg.latitude, longitude=msg.longitude,altitude=msg.altitude)))
    >> FlyingStateChanged(state="flying")
    ).wait(_timeout=30)
if not expectation_point.success():

It won’t change anything because the class drone() returns the expectation passed as argument for convenience, but it just the way we usually do

Secondly, it seems like the State command is the guilty one, not to offend you but just to be sure it is not a careless error, you did change the real values of your latittude and longitude in your error log by XX to hide it, but the values you’re using to expect your command are real ones?

Then, I do not see any check of your flight state before sending a Point directive
That’s why this check is quite necessary to pass before sending a directive :

expectation_hovering = drone(FlyingStateChanged(state="hovering")).wait(_timeout=30)
if not expectation_hovering.success():
    print("The drone is not hovering")
    return
print("Hovering succeeded! The drone is flying in 'hovering' state!")

Once the drone has reached this hovering state, you can make sure it is ready to receive a future pointandfly directive

# Make sure that the drone can apply a directive in its 'hovering' (ie flying) state
expectation_pointnfly_flying = drone(State(idle=dict())).wait(_timeout=30)
if not expectation_pointnfly_flying.success():
    print("PointNFly is not possible even if the drone is flying")
    return
print("PointNFly correct state 'idle' when the drone is in 'hovering' state!")

Finally, the code I’ve provided you to check why the drone is not getting pointandfly directives once it is in the correct hovering flying state could help us :slight_smile: :

possible_unavailability_reasons = drone(State()).wait(
    _timeout=15.0
)
assert possible_unavailability_reasons.success()

unavailability_reasons = (
    possible_unavailability_reasons.received_events().last().args["unavailable"]
)
print(f"Unavailibity reasons : {unavailability_reasons}")

Hello,
Yes I have good values for latitude, longitude. Altitude is from ground or gps ? I tried both and it did not work.
I tried what you gave me :

expectation_hovering = drone(FlyingStateChanged(state="hovering")).wait(_timeout=30)
if not expectation_hovering.success():
    print("The drone is not hovering")
    return
print("Hovering succeeded! The drone is flying in 'hovering' state!")
# Make sure that the drone can apply a directive in its 'hovering' (ie flying) state
expectation_pointnfly_flying = drone(State(idle=dict())).wait(_timeout=30)
if not expectation_pointnfly_flying.success():
    print("PointNFly is not possible even if the drone is flying")
    return
print("PointNFly correct state 'idle' when the drone is in 'hovering' state!")

And it returned :

PointNFly is not possible even if the drone is flying

So I tried to get the state of The point directive but here you get the state of the drone not of the point directive, isn’t it :

drone(State(idle=dict())).wait(_timeout=30)

Assuming you get the state of the point directive, i tried :

    if not expectation_pointnfly_flying.success():
        rospy.logdebug("PointNFly : not idle")
        expectation_pointnfly_active = self.drone(State(active=dict())).wait(_timeout=10)
        if not expectation_pointnfly_active.success():
            print("PointNFly : not active")
            expectation_pointnfly_unavailable = self.drone(State(unavailable=dict())).wait(_timeout=10)
            if not expectation_pointnfly_unavailable.success():
                print("PointNFly : not available")
        return

What returns :

PointNFly : not idle
PointNFly : not active
PointNFly : not available

But here I assume it is a problem with my code. I really don’t know what is wrong there

Hey Jatou,

The State in question comes from the PointNFly proto message, so it’s the state of the pointnfly feature of the drone, not the global state of the drone (we have plenty of states possible) :slight_smile: so it actually is the state of the point/fly directives

Okey so your pointnfly is indeed not available, but it seems like your drone is effectively hovering
When printing the args of the expectation_pointnfly_unavailable expectation, you still do have an empty list don’t you?

unavailability_reasons = (
    possible_unavailability_reasons.received_events().last().args["unavailable"]
)
print(f"Unavailibity reasons : {unavailability_reasons}")

What bothers me is that my code works for me and I can’t reproduce your issue

Actually, the list is not empty :

if not execution.success():
        print("PointNfFly The drone has failed executing the 'Point' directive")
        expectation_point.explain()
        print(f" EXPLAIN : {expectation_point.explain()}")

Returns :

PointNfFly The drone has failed executing the 'Point' directive
        pointnfly.Event.State(
            active=pointnfly.State.Active(
                point=dict(gimbal_control_mode=0, latitude=XX, longitude=XX, altitude=10.0)
PointNFly unavailibility type : <class 'olympe.arsdkng.expectations.ArsdkCheckStateExpectation'>, reasons : {'_future': <Future at 0x7f2af43792e0 state=cancelled>, '_awaited': True, '_scheduler': <olympe.media.MediaScheduler object at 0x7f2af71bcd30>, '_success': False, '_timeout': None, '_deadline': None, '_timedout': False, '_scheduled_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f2af42636c0>, 0)>, '_asyncio_future': None, '_float_tol': (1e-07, 1e-09), 'expected_message': <olympe.arsdkng.messages.pointnfly.Event.State object at 0x7f2af4263fa0>, 'expected_event_type': <class 'olympe.arsdkng.events.ArsdkProtoMessageEvent'>, 'expected_args': OrderedDict([('unavailable', {})]), 'matched_state': None}, 0
PointNFly unavailibility type : <class 'olympe.arsdkng.expectations.ArsdkEventExpectation'>, reasons : {'_future': <Future at 0x7f2af4433f40 state=pending>, '_awaited': True, '_scheduler': <olympe.media.MediaScheduler object at 0x7f2af71bcd30>, '_success': False, '_timeout': None, '_deadline': None, '_timedout': False, '_scheduled_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f2af4433270>, 0)>, '_asyncio_future': None, '_float_tol': (1e-07, 1e-09), 'expected_message': <olympe.arsdkng.messages.pointnfly.Event.State object at 0x7f2af44336d0>, 'expected_args': OrderedDict([('unavailable', {})]), 'expected_event_type': <class 'olympe.arsdkng.events.ArsdkProtoMessageEvent'>, 'received_args': [], '_received_events': [], 'matched_args': OrderedDict()}, 1
PointNFly unavailable for none of the unavailibility reasons

It says your state is unavailable for directives, but without giving the reason
What version of firmware are you using?

I am using sphinx for tests :
sphinx --version
Parrot-Sphinx simulator version 2.15.1

Gazebo multi-robot simulator, version 11.11.0
Copyright (C) 2012 Open Source Robotics Foundation.
Released under the Apache 2 License.

And my drone (never tested yet) : 7.7.1