I am having some traebacks running when I am using olympe. Sometimes it prints out a few stack traces, like these:
exception calling callback for <Future at 0x7f98e1767b50 state=finished returned NoneType>
Traceback (most recent call last):
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/home/parrot/code/parrot-groundsdk/packages/olympe/src/olympe/arsdkng/expectations.py", line 252, in <lambda>
id(subscriber)
KeyError: 140294710781968
exception calling callback for <Future at 0x7f98e17fafd0 state=finished returned NoneType>
Traceback (most recent call last):
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/home/parrot/code/parrot-groundsdk/packages/olympe/src/olympe/arsdkng/expectations.py", line 252, in <lambda>
id(subscriber)
KeyError: 140294710781968
So this looks like it might be a bug in the olympe codebase. The function snippet is part of _notify_subscribers
(github dot com - /Parrot-Developers/olympe/blob/master/src/olympe/arsdkng/expectations.py#L228). It runs when there is an event to notify.
Seems it does the following:
- creates a dict called
defaults
of the current subscribers ._default attributes if they exist - itereates subscriber list
- calls subscriber
.notify()
- if the
.notify()
returns false it skips it- I think this
.notify()
is here - github dot com - /Parrot-Developers/olympe/blob/master/src/olympe/arsdkng/listener.py#L77 - it looks like it returns True if the event was added
- I think this
- if
.notify()
was true (event was added), it:- removes the subscriber from the
defaults
list if it has a._default
- then it starts an async process that runs subscriber’s
.process
function-
.process
just looks at it’s event queue and calls the proper callbacks till empty
-
- it then adds that async future to the dict of running subscribers with a key of the id() of the subscriber
- it then adds a done callback to that future to remove itself from the
running_subscribers
when .process finishes
- removes the subscriber from the
- calls subscriber
- iterates over the leftover defaults list and runs the default.process in an async thread (default is something passed into Subscriber ctor)
so I think the problem here is that _notify_subscribers
is being called rapidly and on the first iteration it starts running the subscriber’s .process
function in an async thread, but then before that future completes it enters this function again and starts another subscriber.process
function but then uses the same key for tracking that async thread in the running_subscribers
dict. Now we have 2 futures running that will try to pop the same key from the dict when they end. The first future removes itself from running, then the 2nd future tries to do the same, but the first future already removed that key from the dict, and there is no check to see if the key exists before popping.
It seems wrong that the same subscriber would be running a subscriber’s .process
in parallel on 2 different threads. It doesn’t look like subscriber/listener has any mutex/lock multithreaded protections (expectations does seem to have these - see the with self._attr.default.subscribers_lock
that’s in _notify_subscribers. But it does look like the queue (collections.deque) being used might be thread safe.
I imagine this function should not be running a 2nd subscriber.process
. But I also think if we add some code to check if a subscriber is already running and don’t add another future/async, there might be a race condition where an event gets left to dwell a little while before the subscriber can get kicked off again because the future is just winding down out of the while loop and before the future finishes, another event is thrown down to the subscriber and the future still exists in the dict so it doesn’t start a new .process
.
The queue they are using for events seems like it might be threadsafe as long as one thread is pushing and the other is popping from the other end (8.3. collections — High-performance container datatypes — Python 2.7.18 documentation), so at least this thread adding events with .notify()
while the future/async pops off of it should be thread safe. Not sure what happens if 2 threads are popping off the same end. It might be okay even with the same subscriber popping off one side with multiple threads? If this is the case, the bug is just trying to blindly remove the key if there are multiple subscriber.process
’es running.
I can get rid of the KeyErrors by not adding a future/async if the subscriber is already running it’s .process
by adding:
if id(subscriber) in self._attr.default.running_subscribers:
getLogger("olympe.expectations").warning(f"Subscriber {subscriber} with id {id(subscriber)} already running")
continue
Right before the run_async future is created.
Can remove the warning
as well - that’s just to see it triggering.
Anyone encounter similar issues/tracebacks and have dug into this code? Anyone have any thoughts on the analysis above or any ideas?