Subscriber Future KeyError

I am having some traebacks running when I am using olympe. Sometimes it prints out a few stack traces, like these:

exception calling callback for <Future at 0x7f98e1767b50 state=finished returned NoneType>
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/home/parrot/code/parrot-groundsdk/packages/olympe/src/olympe/arsdkng/expectations.py", line 252, in <lambda>
    id(subscriber)
KeyError: 140294710781968
exception calling callback for <Future at 0x7f98e17fafd0 state=finished returned NoneType>
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/home/parrot/code/parrot-groundsdk/packages/olympe/src/olympe/arsdkng/expectations.py", line 252, in <lambda>
    id(subscriber)
KeyError: 140294710781968

So this looks like it might be a bug in the olympe codebase. The function snippet is part of _notify_subscribers (github dot com - /Parrot-Developers/olympe/blob/master/src/olympe/arsdkng/expectations.py#L228). It runs when there is an event to notify.

Seems it does the following:

  1. creates a dict called defaults of the current subscribers ._default attributes if they exist
  2. itereates subscriber list
    1. calls subscriber .notify()
    2. if the .notify() returns false it skips it
      1. I think this .notify() is here - github dot com - /Parrot-Developers/olympe/blob/master/src/olympe/arsdkng/listener.py#L77
      2. it looks like it returns True if the event was added
    3. if .notify() was true (event was added), it:
      1. removes the subscriber from the defaults list if it has a ._default
      2. then it starts an async process that runs subscriber’s .process function
        1. .process just looks at it’s event queue and calls the proper callbacks till empty
      3. it then adds that async future to the dict of running subscribers with a key of the id() of the subscriber
      4. it then adds a done callback to that future to remove itself from the running_subscribers when .process finishes
  3. iterates over the leftover defaults list and runs the default.process in an async thread (default is something passed into Subscriber ctor)

so I think the problem here is that _notify_subscribers is being called rapidly and on the first iteration it starts running the subscriber’s .process function in an async thread, but then before that future completes it enters this function again and starts another subscriber.process function but then uses the same key for tracking that async thread in the running_subscribers dict. Now we have 2 futures running that will try to pop the same key from the dict when they end. The first future removes itself from running, then the 2nd future tries to do the same, but the first future already removed that key from the dict, and there is no check to see if the key exists before popping.

It seems wrong that the same subscriber would be running a subscriber’s .process in parallel on 2 different threads. It doesn’t look like subscriber/listener has any mutex/lock multithreaded protections (expectations does seem to have these - see the with self._attr.default.subscribers_lock that’s in _notify_subscribers. But it does look like the queue (collections.deque) being used might be thread safe.

I imagine this function should not be running a 2nd subscriber.process. But I also think if we add some code to check if a subscriber is already running and don’t add another future/async, there might be a race condition where an event gets left to dwell a little while before the subscriber can get kicked off again because the future is just winding down out of the while loop and before the future finishes, another event is thrown down to the subscriber and the future still exists in the dict so it doesn’t start a new .process.

The queue they are using for events seems like it might be threadsafe as long as one thread is pushing and the other is popping from the other end (8.3. collections — High-performance container datatypes — Python 2.7.18 documentation), so at least this thread adding events with .notify() while the future/async pops off of it should be thread safe. Not sure what happens if 2 threads are popping off the same end. It might be okay even with the same subscriber popping off one side with multiple threads? If this is the case, the bug is just trying to blindly remove the key if there are multiple subscriber.process’es running.

I can get rid of the KeyErrors by not adding a future/async if the subscriber is already running it’s .process by adding:

                    if id(subscriber) in self._attr.default.running_subscribers:
                        getLogger("olympe.expectations").warning(f"Subscriber {subscriber} with id {id(subscriber)} already running")
                        continue

Right before the run_async future is created.

Can remove the warning as well - that’s just to see it triggering.

Anyone encounter similar issues/tracebacks and have dug into this code? Anyone have any thoughts on the analysis above or any ideas?

Hi,

Thank you for reporting this issue

I haven’t reproduced this issue but anyway your analysis seems correct here. If the same subscriber is processing two subsequent events, the first future to complete pop the second from the running_subscribers dictionary and the second future produce the above stack trace.

The processing is running asynchronously in one (and only one) dedicated thread (self._attr.default.subscribers_thread_loop) for all subscribers. I don’t think there is any concurrency or race condition issue here.

With this workaround, events are being dropped instead of being processed by the subscriber so the logs may look more friendly but I think I’d prefer the subscriber to process all events even if that means the scheduler is loosing track of what’s currently running.

I’ll keep you informed when a fix will be available.

Thank you

Are the events being dropped? The ._notify occurs above that line, so the event is added to the queue (which I assume will be consumed by the subscriber.process that is already running)

Another method might be to do a quick check on whether the key exists before pop’ing? But then you likely can’t use the lambda since it’s more than one expression.

Hi,

I think the following patch should fix this issue:

diff --git a/src/olympe/arsdkng/expectations.py b/src/olympe/arsdkng/expectations.py
index 380002e..f3cbb44 100644
--- a/src/olympe/arsdkng/expectations.py
+++ b/src/olympe/arsdkng/expectations.py
@@ -44,7 +44,7 @@ from boltons.setutils import IndexedSet
 from concurrent.futures import Future, as_completed
 from concurrent.futures import TimeoutError as FutureTimeoutError
 from concurrent.futures import CancelledError as FutureCancelledError
-from collections import OrderedDict, deque
+from collections import OrderedDict, defaultdict, deque
 from collections.abc import Iterable, Mapping
 from logging import getLogger
 from olympe._private import (
@@ -145,7 +145,7 @@ class DefaultScheduler(AbstractScheduler):
         # Subscribers internal state
         self._attr.default.subscribers_lock = threading.Lock()
         self._attr.default.subscribers = []
-        self._attr.default.running_subscribers = OrderedDict()
+        self._attr.default.running_subscribers = defaultdict(list)
         self._attr.default.subscribers_thread_loop = PompLoopThread(
             self._attr.default.logger
         )
@@ -249,12 +249,12 @@ class DefaultScheduler(AbstractScheduler):
                     future = self._attr.default.subscribers_thread_loop.run_async(
                         subscriber.process
                     )
-                    self._attr.default.running_subscribers[id(subscriber)] = future
+                    self._attr.default.running_subscribers[id(subscriber)].append(future)
                     future.add_done_callback(
                         functools.partial(
-                            lambda subscriber, _: self._attr.default.running_subscribers.pop(
+                            lambda subscriber, _: self._attr.default.running_subscribers[
                                 id(subscriber)
-                            ),
+                            ].remove(future),
                             subscriber,
                         )
                     )
@@ -305,6 +305,6 @@ class DefaultScheduler(AbstractScheduler):
         with self._attr.default.subscribers_lock:
-            future = self._attr.default.running_subscribers.pop(id(subscriber), None)
-            if future is not None:
+            futures = self._attr.default.running_subscribers.pop(id(subscriber), [])
+            for future in futures:
                 try:
                     future.result(subscriber.timeout)
                 except Exception as e:

With this change, running_subscribers is now a subriber(id) -> list(future) mapping, so each subscriber can now be notified multiple times before being processed.

Please let me know if that works for you. Thanks

The subscriber.notify(event) just asks the subscriber if it want to process this event. The actual processing of an event in performed in another thread by subscriber.process().
With your proposed patch, yes the events are eventually being dropped (because of a race condition).

I tried this patch, I am still getting an error, on the .remove line:

Although it does seem to have this error less than the KeyError it was getting before…

exception calling callback for <Future at 0x7ffad033cc90 state=finished returned NoneType>
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/home/parrot/code/parrot-groundsdk/packages/olympe/src/olympe/arsdkng/expectations.py", line 251, in <lambda>
    ].remove(future),
ValueError: list.remove(x): x not in list

I think I may have figured how our code is interacting with olympe to cause this issue.

The code I am using sets up an olympe.EventListener. It has a function that is listening to many events and publishing them off somewhere else - basically a forwarder. But then also has another function that is listening to specific overlapping events and does some processing on that data.

I guess even with this new code subscriber isn’t specific enough because the subscriber has effectively 2 listeners for the same event. So when the .remove() or the older .pop() is being called it removes the future but then the other listener function is also working on the same item and eventually also calls the lambda to remove it.

Thank you for your feedback! This time I think I got it right with the following patch:

diff --git a/src/olympe/arsdkng/expectations.py b/src/olympe/arsdkng/expectations.py
index 8c0bd88..feb9755 100644
--- a/src/olympe/arsdkng/expectations.py
+++ b/src/olympe/arsdkng/expectations.py
@@ -44,7 +44,7 @@ from boltons.setutils import IndexedSet
 from concurrent.futures import Future, as_completed
 from concurrent.futures import TimeoutError as FutureTimeoutError
 from concurrent.futures import CancelledError as FutureCancelledError
-from collections import OrderedDict, deque
+from collections import OrderedDict, defaultdict, deque
 from logging import getLogger
 from olympe._private import (
     callback_decorator,
@@ -139,7 +139,7 @@ class DefaultScheduler(AbstractScheduler):
         # Subscribers internal state
         self._attr.default.subscribers_lock = threading.Lock()
         self._attr.default.subscribers = []
-        self._attr.default.running_subscribers = OrderedDict()
+        self._attr.default.running_subscribers = defaultdict(list)
         self._attr.default.subscribers_thread_loop = PompLoopThread(
             self._attr.default.logger
         )
@@ -223,6 +223,7 @@ class DefaultScheduler(AbstractScheduler):
     def destroy(self):
         self.stop()
         self._attr.default.subscribers_thread_loop.stop()
+        self._attr.default.subscribers_thread_loop.destroy()
 
     @callback_decorator()
     def _notify_subscribers(self, event):
@@ -242,13 +243,13 @@ class DefaultScheduler(AbstractScheduler):
                     future = self._attr.default.subscribers_thread_loop.run_async(
                         subscriber.process
                     )
-                    self._attr.default.running_subscribers[id(subscriber)] = future
+                    self._attr.default.running_subscribers[id(subscriber)].append(future)
                     future.add_done_callback(
                         functools.partial(
-                            lambda subscriber, _: self._attr.default.running_subscribers.pop(
+                            lambda subscriber, future, _: self._attr.default.running_subscribers[
                                 id(subscriber)
-                            ),
-                            subscriber,
+                            ].remove(future),
+                            subscriber, future
                         )
                     )
 
@@ -296,8 +297,8 @@ class DefaultScheduler(AbstractScheduler):
         :type subscriber: Subscriber
         """
         with self._attr.default.subscribers_lock:
-            future = self._attr.default.running_subscribers.pop(id(subscriber), None)
-            if future is not None:
+            futures = self._attr.default.running_subscribers.pop(id(subscriber), [])
+            for future in futures:
                 try:
                     future.result(subscriber.timeout)
                 except Exception as e: