Using multi-threading and asyncio together#
Most of the time when you write an asyncio application, it’ll typically be single-threaded and I/O-bound. This combination is what makes asyncio fast and efficient compared to their equivalent threaded programs, because you only need one thread to wait for network and other I/O operations to complete. However in certain applications, blocking code is sometimes impossible to escape, and so you’ll find yourself combining asyncio with multiple threads.
Python’s relationship with threading#
Before we discuss using threads, we need to talk about Python’s Global Interpreter Lock.
On 2024-10-07, Python 3.13 released with GIL and no-GIL builds as a result of the tremendous work done towards PEP 703! However, “free-threaded CPython” is still in experimental stages, and will need a lot more community support before it can become widely adopted.
For most of Python’s lifetime, it lived with a lock that prevented the Python interpreter from executing more than one bytecode instruction at a time across threads. This meant that true parallelism with pure-Python code couldn’t be achieved via multi-threading, so users that needed this had to look towards other alternatives like multi-processing which brings the overhead of IPC, or C-extensions that release the GIL but take more effort to maintain and requires the extension to be compiled, either ahead of time by the maintainer or by the user installing the extension.
As such, Python’s multi-threading tended to be reserved for CPU-bound operations implemented via C-extensions, or for I/O-bound operations. requests for example is a library that performs I/O-bound HTTP requests synchronously. Because a making request blocks the current thread, you need multiple threads to make multiple requests simultaneously instead of sequentially:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
URLS = [...]
def request(url: str, timeout: float) -> bytes:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.content
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {}
for url in URLS:
future_to_url[url] = executor.submit(request, url, timeout=60)
for future in as_completed(future_to_url):
url = future_to_url[future]
data = future.result()
print(f"{url} page returned {len(data)} bytes")
When you need a high amount of concurrency, you’ll start to be limited by your system resources when you have 100s to 1000s of threads. Of course that might be enough to saturate your network, but overall it’s not a super efficient option for concurrent I/O.
That’s why we have asyncio
. Cooperative multi-tasking via an event loop
with callbacks, combined with non-blocking I/O, allows a single thread to
handle multiple requests at once, relying on the operating system to wake up
the thread when events / responses are received.
Coroutines and futures can then abstract the underlying callbacks
into a succinct syntax that resembles the procedural programming
Python developers are used to:
import asyncio
import httpx
URLS = [...]
async def make_request(client: httpx.AsyncClient, url: str, timeout: float) -> bytes:
async with client.get(url, timeout=timeout) as response:
response.raise_for_status()
return response.content
async def main():
async with httpx.AsyncClient() as client, asyncio.TaskGroup() as tg:
tasks_to_url = {}
for url in URLS:
t = tg.create_task(make_request(client, url))
tasks_to_url[t] = url
for task in asyncio.as_completed(tasks_to_url):
data = await task
print(f"{url} page returned {len(data)} bytes")
asyncio.run(main())
The difference being that each task of execution takes up much less resources than threads, on the scale of hundreds of thousands.
Note
asyncio.TaskGroup
was added in Python 3.11, and has similar
semantics to ThreadPoolExecutor in that it ensures all tasks are
completed upon exiting the context manager. But unlike the executor,
TaskGroup will cancel any in-progress tasks if one of them fails.
If you’re not familiar with idea of “structured concurrency”,
you should read Nathaniel J. Smith’s article about it,
who also originally authored the Trio concurrency library.
Running threads in an event loop#
Because asyncio relies on cooperative multi-tasking for concurrency,
all callbacks that run on the event loop need to be non-blocking.
One blocking callback, which could be one step of a coroutine that incorrectly
makes a request using requests
instead of an async HTTP client, would cause
the entire event loop to stop processing events until that callback finished.
The solution for that is simple: switch to a library that relies on asyncio’s
mechanisms for non-blocking I/O, like aiohttp or httpx.
But maybe your situation is more complex; no one’s made an async library that
fits your needs, or there’s too much code to migrate to async/await, or your
code is CPU-bound rather than I/O-bound (e.g. image processing with Pillow).
In this case, you’ll need to fallback to pre-emptive multi-tasking, either
with threads or subprocesses. For this guide, I’ll only cover threading.
Python 3.9 introduced asyncio.to_thread()
as a convenient shorthand
for submitting functions to the event loop’s default thread pool executor:
def process_image(image: Image) -> Image:
...
images = [...]
tasks = []
processed = []
async with asyncio.TaskGroup() as tg:
for image in images:
coro = asyncio.to_thread(process_image, image)
t = tg.create_task(coro)
tasks.append(t)
for t in tasks:
processed.append(await t)
Before that, you had to call the lower-level
loop.run_in_executor()
method instead:
loop = asyncio.get_running_loop()
futures = []
for image in images:
fut = loop.run_in_executor(None, process_image, image)
futures.append(fut)
try:
await asyncio.gather(*futures)
except BaseException:
for fut in futures:
fut.cancel()
raise
You could also create your own ThreadPoolExecutor and submit tasks to that:
async def main(executor: ThreadPoolExecutor):
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, process_image, image)
# Above would be equivalent to writing:
await asyncio.wrap_future(executor.submit(process_image, image))
with ThreadPoolExecutor(max_workers=5) as executor:
asyncio.run(main(executor))
Keep in mind that the default thread pool executor has a maximum number of threads based on your processor’s core count. As such, you should not run long-lived tasks on the default executor. That reduces the number of workers available to other tasks and can at worst saturate the executor, preventing new tasks from being processed indefinitely.
Managing long-lived tasks#
What do you do to run long-lived tasks in threads then? It’s simple! Thread them as you normally would in a synchronous program:
def serve_forever(addr):
...
async def main():
thread = threading.Thread(target=serve_forever, args=(addr,))
thread.start()
# Profit???
…okay, but how do you communicate messages from your event loop to your thread? And how do you make sure the thread closes alongside your event loop? Well, this is where you should have a decent understanding of thread-safety as it relates to asyncio.
Let’s start with inter-thread communication in the form of queues and messages because it’s a versatile design pattern. How would we typically use a queue in a synchronous program that processes images in the background? Well, it might look like this:
import threading
from queue import Queue
class ImageProcessor:
def __init__(self) -> None:
self._queue: Queue[bytes | None] = Queue()
self._thread: threading.Thread | None = None
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._thread is not None
self.stop()
self._thread.join()
def start(self):
self._thread = threading.Thread(target=self.run_forever)
self._thread.start()
def stop(self):
self._queue.put(None)
def run_forever(self):
while True:
item = self._queue.get()
if item is None:
break
# Do some processing with the given image bytes...
result = b"processed " + item
def submit(self, image_bytes: bytes):
self._queue.put(image_bytes)
with ImageProcessor() as worker:
worker.submit(b"some image bytes")
Here, we instantiate the queue object in the main thread and then start
the worker thread from our context manager, which blocks on the queue until
items are submitted to it. The thread knows to stop when it receives a None
sentinel value upon exiting the context manager.
Warning
For threads that handle I/O or otherwise anything that should be
cleaned up upon exiting, please refrain from using daemon=True
and starting your thread without joining it.
Yes, it means you don’t have to deal with checking when to stop, but it also makes your program prone to breaking in obscure ways when some missed teardown results in improperly closed connections or half-written files.
How do we translate this to asyncio? Let’s start with the simplest option, which is using the same code in a coroutine:
async def main():
with ImageProcessor() as worker:
worker.submit(b"some image bytes")
await do_something_else()
asyncio.run(main())
For a simple script like this, you won’t notice any issues with it!
That’s because we only have one task in our event loop, the main task.
If do_something_else()
were to receive an asyncio.CancelledError
,
perhaps caused by a keyboard interrupt, the context manager would tell
the worker to shut down and join the thread, freezing the event loop
during that period. The main task was the only thing running anyway,
so nothing else got blocked.
However, if there were other tasks running at the same time:
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(do_something_else())
with ImageProcessor() as worker:
worker.submit(b"some image bytes")
await asyncio.sleep(3)
raise Exception("Houston, we have a problem!")
Then when exiting the worker, it would block the event loop until the remaining
image was processed, preventing do_something_else()
from running during
that period. We could remove the _thread.join()
call, but that would
make it much more confusing to reason about the thread’s lifetime.
It’s also worth noting that queue.put()
can also block, but since the queue
doesn’t have a max size set, it’s effectively non-blocking.
So what do we do about it? Actually, not a lot is needed to avoid this problem.
Because worker.submit()
doesn’t need to be changed, the only problem we
have is shutting down the worker thread. To fix that, we just need to do the
same thing in the previous ThreadPoolExecutor example, which is to construct
our image processor before starting the event loop:
async def main(worker: ImageProcessor):
async with asyncio.TaskGroup() as tg:
tg.create_task(do_something_else())
worker.submit(b"some image bytes")
await asyncio.sleep(3)
raise Exception("Houston, we have a problem!")
with ImageProcessor() as worker:
asyncio.run(main(worker))
Now if an error occurs, our event loop is able to run all remaining coroutines and shut down before our worker blocks and shuts down.
What if I want to create workers on the fly?
Right, we didn’t really solve the problem of managing threads from inside the event loop. To do that, we’ll need to know how to send events from the worker to the event loop first.
Because of how asyncio depends on callbacks being non-blocking, the way you
communicate messages from your worker thread back to the event loop has to
be different from how you send messages to the worker thread. You can’t just
wait on a queue.Queue
in asyncio, as that would block the event loop!
If only we had an asynchronous version of a queue to wait on…
You probably already guessed where this is going. Yes, asyncio has it’s own
Queue
class! But you can see what it says in the docs:
This class is not thread safe.
Oh no, it’s not thread-safe! That means we can’t put items from another thread, right? Actually, clicking the link in that text says we can, but not by putting items from the thread directly:
Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback. If there’s a need for such code to call a low-level asyncio API, the
loop.call_soon_threadsafe()
method should be used, e.g.:loop.call_soon_threadsafe(fut.cancel)
Okay, so this method is how we can call methods of asyncio objects from other threads.
But what makes this method thread-safe compared to calling queue.put_nowait()
directly? Well, this comes down to what an event loop really does.
You can guess from the name that there’s a loop, but if you look at its
source code,
you can see what happens in each iteration:
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
...
event_list = self._selector.select(timeout)
self._process_events(event_list)
...
for i in range(ntodo):
handle = self._ready.popleft()
...
handle._run()
It blocks on this selector object waiting for events to come in,
and then runs all callbacks scheduled for that iteration.
How the selector waits for events will depend on the type of event loop
that was created by asyncio, but by default, the DefaultEventLoopPolicy
uses the selectors
module on Unix, and I/O Completion Ports on Windows.
Both of them interface with the operating system’s APIs to wait on multiple
data sources at once, whether it be network sockets, or named pipes used in IPC.
Why does this matter to using asyncio.Queue
from another thread?
Well, you know how once a thread calls a function that blocks, that thread can’t
do anything else? [1] For an event loop, the same issue exists too.
If you were to call queue.put_nowait()
from the worker thread while the
event loop was waiting on an event, the item would get queued but nothing
would happen because the event loop is still blocked on the selector.
How does loop.call_soon_threadsafe()
solve this issue? It schedules your
callback to run on the event loop’s thread, and then sends one byte to a
self-pipe which the selector is listening on:
def call_soon_threadsafe(self, callback, *args, context=None):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
return handle
That wakes up the event loop, allowing it to run the callback
you passed to the method along with anything else that was scheduled.
If we were to pass queue.put_nowait
as our callback, that would enqueue
our item for us and wake up the first task waiting on queue.get()
,
scheduling the task to resume in the next iteration of the event loop.
So that’s everything we need! Let’s put that method into practice.
We can add a way to register callbacks on our worker that run when an item
has finished processing:
from typing import Callable
ImageProcessorCallback = Callable[[bytes], object]
class ImageProcessor:
def __init__(self) -> None:
self._queue: Queue[bytes | None] = Queue()
self._thread: threading.Thread | None = None
self._callbacks: list[ImageProcessorCallback] = []
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._thread is not None
self.stop()
self._thread.join()
def start(self):
self._thread = threading.Thread(target=self.run_forever)
self._thread.start()
def stop(self):
self._queue.put(None)
def add_done_callback(self, callback: ImageProcessorCallback):
self._callbacks.append(callback)
def run_forever(self):
while True:
item = self._queue.get()
if item is None:
break
result = b"processed " + item
for callback in self._callbacks:
callback(result)
def submit(self, image_bytes: bytes):
self._queue.put(image_bytes)
And then use it to redirect results to an asynchronous queue:
async def main(worker: ImageProcessor):
loop = asyncio.get_running_loop()
queue: asyncio.Queue[bytes] = asyncio.Queue()
worker.add_done_callback(
lambda result: loop.call_soon_threadsafe(queue.put_nowait, result)
)
worker.submit(b"some image bytes")
result = await queue.get()
print("processed image bytes:", result)
And there you go! This uses a thread-safe queue and non-blocking callbacks to communicate events between both threads at once. We can wrap it up in another class to make this interaction simpler:
class AsyncImageProcessor:
def __init__(self, worker: ImageProcessor):
self._worker = worker
self._queue: asyncio.Queue[bytes] = asyncio.Queue()
self._loop = asyncio.get_running_loop()
self._worker.add_done_callback(self._on_item_processed)
def _on_item_processed(self, result: bytes):
self._loop.call_soon_threadsafe(self._queue.put_nowait, result)
async def submit(self, item: bytes) -> bytes:
self._worker.submit(item)
return await self._queue.get()
# Because the worker processes items sequentially and queue.get()
# wakes up waiters in FIFO order, our results will be in the same order.
# This would stop being true if items could be returned out of order.
#
# The callback could also use something besides an asyncio.Queue to
# store results and notify waiters, like pairing each item with an
# asyncio future.
async def main(sync_worker: ImageProcessor):
worker = AsyncImageProcessor(sync_worker)
tasks = []
async with asyncio.TaskGroup() as tg:
for i in range(5):
image = f"image {i}".encode()
task = tg.create_task(worker.submit(image))
tasks.append(task)
for task in asyncio.as_completed(tasks):
print(await task)
Note
Hey, what about queue.put()
? Can I pass that as a callback too?
From asyncio’s perspective, callbacks just mean synchronous functions.
queue.put()
returns a coroutine object,
and the event loop doesn’t actually know how to run a coroutine object
by itself. That’s what an asyncio.Task
is for; it knows how
to turn the “steps” in a coroutine into callbacks, and schedule them on
the event loop.
There’s more to investigate here about how futures bridge the event loop’s
low-level callbacks with high-level async/await coroutines, but I
won’t discuss it in this guide. Just know that there’s a separate function
for scheduling coroutines from other threads, run_coroutine_threadsafe()
,
which has the handy feature of returning a concurrent.futures.Future
that lets you wait on the coroutine’s result from another thread.
There’s just one thing left that I didn’t answer: What if I want to create workers on the fly? Or in other words, how do I manage worker threads inside the event loop?
Now that we know how to send events to the event loop from worker threads,
how would you modify the above classes so it can asynchronously start and
stop the worker inside main()
, using async with
syntax?
When implemented, its usage should look like:
async def main():
tasks = []
async with AsyncImageProcessor() as worker, asyncio.TaskGroup() as tg:
for i in range(5):
image = f"image {i}".encode()
task = tg.create_task(worker.submit(image))
tasks.append(task)
for task in asyncio.as_completed(tasks):
print(await task)
asyncio.run(main())
If you want to look now, here’s a way of solving it:
Solution
ImageProcessorCallback = Callable[[bytes], object]
ImageProcessorStopCallback = Callable[[], object]
class ImageProcessor:
def __init__(self) -> None:
self._queue: Queue[bytes | None] = Queue()
self._thread: threading.Thread | None = None
self._callbacks: list[ImageProcessorCallback] = []
self._stop_callbacks: list[ImageProcessorStopCallback] = []
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._thread is not None
self.stop()
self._thread.join()
def start(self):
self._thread = threading.Thread(target=self.run_forever)
self._thread.start()
def stop(self):
self._queue.put(None)
def add_done_callback(self, callback: ImageProcessorCallback):
self._callbacks.append(callback)
def add_stop_callback(self, callback: ImageProcessorStopCallback):
self._stop_callbacks.append(callback)
def run_forever(self):
while True:
item = self._queue.get()
if item is None:
break
result = b"processed " + item
for callback in self._callbacks:
callback(result)
for callback in self._stop_callbacks:
callback()
def submit(self, image_bytes: bytes):
self._queue.put(image_bytes)
class AsyncImageProcessor:
def __init__(self, factory: Callable[[], ImageProcessor] = ImageProcessor):
self.factory = factory
self._queue: asyncio.Queue[bytes] = asyncio.Queue()
self._loop = asyncio.get_running_loop()
self._worker: ImageProcessor | None = None
self._stop_ev = asyncio.Event()
async def __aenter__(self):
self._worker = self.factory()
self._worker.add_done_callback(self._on_item_processed)
self._worker.add_stop_callback(self._on_stop)
self._worker.start()
return self
async def __aexit__(self, exc_type, exc_val, tb):
assert self._worker is not None
self._worker.stop()
await self._stop_ev.wait()
def _on_item_processed(self, result: bytes):
self._loop.call_soon_threadsafe(self._queue.put_nowait, result)
def _on_stop(self):
self._loop.call_soon_threadsafe(self._stop_ev.set)
async def submit(self, item: bytes) -> bytes:
assert self._worker is not None
self._worker.submit(item)
return await self._queue.get()
Running event loops in other threads#
In the previous sections I discussed creating threads to be used by the asyncio event loop, but what if you have a synchronous program that you can’t migrate to asyncio and yet you still need to run asynchronous code inside it?
Before we start threading, let’s understand the ways we can run coroutines
in our main thread. First and foremost is asyncio.run()
:
import asyncio
async def request(url: str) -> bytes:
async with httpx.AsyncClient() as client:
...
first = asyncio.run(request("https://example.com"))
second = asyncio.run(request("https://sub.example.com"))
asyncio.run()
starts a new event loop each time, runs your
coroutine to completion, cleans up the event loop, and then returns the
result of your coroutine. This can be sufficient for some scenarios,
but it requires the coroutine to set up any asynchronous resources from within
the event loop. asyncio’s primitives, whether it be locks, queues, or sockets,
are tightly coupled to the event loop they’re created in. Trying to re-use
them across event loops can lead to obscure errors as a result of callbacks
running on different event loops:
import asyncio
async def new_fut() -> asyncio.Future:
fut = asyncio.get_running_loop().create_future()
fut.add_done_callback(lambda fut: print("fut finished"))
return fut
async def set_fut(fut: asyncio.Future):
fut.set_result("Done!")
fut = asyncio.run(new_fut())
asyncio.run(set_fut(fut))
Traceback (most recent call last):
File "main.py", line 12, in <module>
asyncio.run(set_fut(fut))
File "Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "Lib\asyncio\base_events.py", line 654, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "main.py", line 9, in set_fut
fut.set_result("Done!")
File "Lib\asyncio\base_events.py", line 762, in call_soon
self._check_closed()
File "Lib\asyncio\base_events.py", line 520, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
This example looks a bit weird since we’re just passing a future,
but in a practical situation this can happen indirectly when you
re-use some instance from an async library, like an HTTP client.
With Python 3.11 and newer, the asyncio.Runner
class
can avoid this issue by re-using an event loop to run multiple
coroutines:
with asyncio.Runner() as runner:
fut = runner.run(new_fut())
runner.run(set_fut(fut))
This lets you start, pause, and resume the event loop while ensuring that it gets cleaned up once you exit the context manager. For older versions, you had to create the event loop yourself:
loop = asyncio.new_event_loop()
fut = loop.run_until_complete(new_fut())
loop.run_until_complete(set_fut(fut))
loop.close()
However, managing the event loop like above is notoriously difficult to get right. The above example, which you might see often in other codebases, has multiple problems with it:
Current event loop not set for the thread
Execution and teardown not enclosed in try/finally
Asynchronous generators not closed
Default executor not closed
Remaining tasks not cancelled and waited on
In short, it has a high risk of not executing try/finally clauses and leaving behind unclosed resources (see this gist for an example). If you really need to do this, I suggest vendoring the Runner class from CPython’s source code, or re-purposing asyncio.run()’s implementation as it existed in 3.10 and older.
Regardless of which method you choose, they all have the same limitation in that the main thread cannot call other functions while it is executing coroutines on the event loop. If you could move the event loop’s execution to another thread, that would free up the main thread to execute other functions. So, how do we make another thread run our event loop?
Well, the first thing you could start with is setting the thread’s target=
function to asyncio.run()
and passing it a coroutine to execute:
async def func():
...
coro = func()
thread = threading.Thread(target=asyncio.run, args=(coro,))
thread.start()
But we know that asyncio.run()
can only run one coroutine before
it closes the event loop. How do we give it more than one coroutine to run?
We need a way to run coroutines while keeping the event loop alive…
What if we give it a coroutine that runs forever? Let’s see:
async def run_forever():
while True:
await asyncio.sleep(60)
thread = threading.Thread(target=asyncio.run, args=(run_forever(),))
thread.start()
Now the event loop is running indefinitely, but we don’t have a way to
pass it coroutines. If you followed along in the previous sections, you
might recall the asyncio.run_coroutine_threadsafe()
function
I briefly mentioned. It exists specifically to let threads submit coroutines
to an event loop and wait on their results if needed. But if you look at
its signature, you’ll see it needs a coroutine object and the event loop
object. We don’t have an event loop object in our main thread, so we need
a way to have the worker thread send us an event loop object back.
We can do this in many ways, but let’s try a simple one, which is assigning
the loop to an attribute and then setting an event:
class EventThread:
def __init__(self):
self._thread: threading.Thread | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._event = threading.Event()
def start(self):
self._thread = threading.Thread(target=asyncio.run, args=(self._run_forever(),), daemon=True)
self._thread.start()
def get_loop(self) -> asyncio.AbstractEventLoop:
self._event.wait()
assert self._loop is not None
return self._loop
async def _run_forever(self):
self._set_event_loop()
while True:
await asyncio.sleep(60)
def _set_event_loop(self):
self._loop = asyncio.get_running_loop()
self._event.set()
And putting our EventThread class into practice:
async def func():
await asyncio.sleep(1)
print("Hello from event loop!")
return 1234
thread = EventThread()
thread.start()
loop = thread.get_loop()
fut = asyncio.run_coroutine_threadsafe(func(), loop)
print("Main thread waiting on coroutine...")
result = fut.result()
print("Main thread received:", result)
Main thread waiting on coroutine...
Hello from event loop!
Main thread received: 1234
Great! We have an event loop that runs forever and a way to submit coroutines
to it. The last thing we’re missing is a way to gracefully stop the thread.
If you read the class carefully, you’ll notice that the thread had daemon=True
set which would abruptly kill the thread, risking improper cleanup just like the
previous asyncio.new_event_loop()
example. To avoid that, we need a way
to send a signal to our _run_forever()
method so it knows to exit.
Given that we used an event to notify the main thread of our event loop object,
let’s try to do the same thing here but with asyncio.Event
instead.
As per the last section, we know that we can call the event’s set()
method
from our main thread using loop.call_soon_threadsafe(event.set)
instead of
asyncio.run_coroutine_threadsafe()
since event.set()
isn’t a coroutine.
You could wrap event.set()
in a coroutine function, but we’ll go with
the former approach since it’s simpler:
class EventThread:
def __init__(self):
self._loop: asyncio.AbstractEventLoop | None = None
self._event = threading.Event()
self._thread: threading.Thread | None = None
self._stop_ev: asyncio.Event | None = None
def start(self):
coro = self._run_forever()
self._thread = threading.Thread(target=asyncio.run, args=(coro,))
self._thread.start()
def get_loop(self) -> asyncio.AbstractEventLoop:
self._event.wait()
assert self._loop is not None
return self._loop
def stop(self):
assert self._stop_ev is not None
assert self._thread is not None
self.get_loop().call_soon_threadsafe(self._stop_ev.set)
self._thread.join()
async def _run_forever(self):
self._stop_ev = asyncio.Event()
self._set_event_loop()
await self._stop_ev.wait()
def _set_event_loop(self):
self._loop = asyncio.get_running_loop()
self._event.set()
And now we can stop the event loop at the end:
thread = EventThread()
thread.start()
loop = thread.get_loop()
fut = asyncio.run_coroutine_threadsafe(func(), loop)
print("Main thread waiting on coroutine...")
result = fut.result()
print("Main thread received:", result)
thread.stop()
print("Event loop stopped")
Main thread waiting on coroutine...
Hello from event loop!
Main thread received: 1234
Event loop stopped
Finally, we have a class that can run an event loop indefinitely and let us submit any coroutine to it! 🎉
Refactoring our EventThread class#
We’re nearing the end of this guide, but I want to suggest some improvements
to this class. For one, the start()
and stop()
methods should be
turned into a context manager so it’s harder to accidentally leave it unclosed.
It could also use a submit()
method that handles calling
asyncio.run_coroutine_threadsafe()
for us.
But I want to talk about the more interesting topic, futures.
First, what is a future? According to asyncio.Future
’s docs:
A Future represents an eventual result of an asynchronous operation.
What does that mean? In Python, this Future class is something you can call
set_result()
on with some arbitrary value, and you can await
from a
coroutine to get its result. It also has this add_done_callback()
method
to make stuff run when the result is set.
In other words, it’s kind of like a one-time event that stores a value
when it’s done. If you look back at the EventThread class:
class EventThread:
def __init__(self):
self._loop: asyncio.AbstractEventLoop | None = None
self._event = threading.Event()
...
def _set_event_loop(self):
self._loop = asyncio.get_running_loop()
self._event.set()
We have a threading.Event
that gets set right after the _loop
attribute
is assigned a value. See what I’m getting at? We’ve manually combined an event
and attribute together to essentially hand-craft our own future, except that
it’s for synchronous threads rather than asyncio.
What if we had a full-fledged synchronous future object that worked like
asyncio.Future
?
Well, you’ve been looking at that since the very start of this guide!
The concurrent.futures
package has its own implementation of
futures used to transfer results from worker threads back to the caller:
with ThreadPoolExecutor() as executor:
fut = executor.submit(my_func, arg1, arg2)
fut.result()
What it returns is a concurrent.futures.Future
class, and calling
.result()
is how we wait for results (since we can’t use await
outside
of async def
functions). It also has the same add_done_callback()
method
to let us add our own callbacks!
Can we create our own futures and use it for other things?
Here’s the awkward bit, the documentation says:
Future instances are created by
Executor.submit()
and should not be created directly except for testing.
Supposedly we shouldn’t use this beyond testing purposes. But if you look at how run_coroutine_threadsafe() is implemented:
def run_coroutine_threadsafe(coro, loop):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
if not coroutines.iscoroutine(coro):
raise TypeError('A coroutine object is required')
future = concurrent.futures.Future()
def callback():
try:
futures._chain_future(ensure_future(coro, loop=loop), future)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise
loop.call_soon_threadsafe(callback)
return future
It’s not actually that difficult to set up the class. You’ll also notice
there’s a function in asyncio, asyncio.wrap_future()
,
specifically meant to convert these Future objects into asyncio equivalents:
- asyncio.wrap_future(future, *, loop=None)#
Wrap a
concurrent.futures.Future
object in aasyncio.Future
object.
Admittedly I can’t guarantee that concurrent.futures.Future
won’t
break in future versions of Python, but regardless, it’s perfect for
our EventThread class which needs to broadcast a single value from its thread.
Let’s replace our _event
and _loop
attributes with a single
_loop_fut
attribute:
from typing import Awaitable, TypeVar
T = TypeVar("T")
class EventThread:
_loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop]
def __init__(self):
self._thread: threading.Thread | None = None
self._stop_ev: asyncio.Event | None = None
self._loop_fut = concurrent.futures.Future()
def __enter__(self):
coro = self._run_forever()
self._thread = threading.Thread(target=asyncio.run, args=(coro,))
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._stop_ev is not None
assert self._thread is not None
self.get_loop().call_soon_threadsafe(self._stop_ev.set)
self._thread.join()
def get_loop(self) -> asyncio.AbstractEventLoop:
return self._loop_fut.result()
def submit(self, coro: Awaitable[T]) -> concurrent.futures.Future[T]:
return asyncio.run_coroutine_threadsafe(coro, self.get_loop())
async def _run_forever(self):
self._stop_ev = asyncio.Event()
self._set_event_loop()
await self._stop_ev.wait()
def _set_event_loop(self):
self._loop_fut.set_result(asyncio.get_running_loop())
See how nice it works? We can also go one step further and replace _stop_ev
with another future:
class EventThread:
_loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop]
_stop_fut: concurrent.futures.Future[None]
def __init__(self):
self._thread: threading.Thread | None = None
self._loop_fut = concurrent.futures.Future()
self._stop_fut = concurrent.futures.Future()
def __enter__(self):
coro = self._run_forever()
self._thread = threading.Thread(target=asyncio.run, args=(coro,))
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._thread is not None
self._stop_fut.set_result(None)
self._thread.join()
def get_loop(self) -> asyncio.AbstractEventLoop:
return self._loop_fut.result()
def submit(self, coro: Awaitable[T]) -> concurrent.futures.Future[T]:
return asyncio.run_coroutine_threadsafe(coro, self.get_loop())
async def _run_forever(self):
self._set_event_loop()
await asyncio.wrap_future(self._stop_fut)
def _set_event_loop(self):
self._loop_fut.set_result(asyncio.get_running_loop())
And just for fun, let’s use a third future to allow the main thread (or other threads) to add callbacks that run when the event loop stops:
EventThreadCallback = Callable[[concurrent.futures.Future[None]], object]
class EventThread:
_loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop]
_stop_fut: concurrent.futures.Future[None]
_finished_fut: concurrent.futures.Future[None]
def __init__(self):
self._thread: threading.Thread | None = None
self._loop_fut = concurrent.futures.Future()
self._stop_fut = concurrent.futures.Future()
self._finished_fut = concurrent.futures.Future()
def __enter__(self):
coro = self._run_forever()
self._thread = threading.Thread(target=asyncio.run, args=(coro,))
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, tb):
assert self._thread is not None
self._stop_fut.set_result(None)
self._thread.join()
def get_loop(self) -> asyncio.AbstractEventLoop:
return self._loop_fut.result()
def submit(self, coro: Awaitable[T]) -> concurrent.futures.Future[T]:
return asyncio.run_coroutine_threadsafe(coro, self.get_loop())
def add_done_callback(self, callback: EventThreadCallback):
self._finished_fut.add_done_callback(callback)
async def _run_forever(self):
self._set_event_loop()
try:
await asyncio.wrap_future(self._stop_fut)
finally:
self._finished_fut.set_result(None)
def _set_event_loop(self):
self._loop_fut.set_result(asyncio.get_running_loop())
This is what its usage will look like:
with EventThread() as thread:
thread.add_done_callback(lambda fut: print("Event loop stopped"))
fut = thread.submit(func())
print("Main thread waiting on coroutine...")
result = fut.result()
print("Main thread received:", result)
Caution
Callbacks added to concurrent.futures.Future
will run on
whichever thread calls its set_result()
method. As such, you should
make sure to only add callbacks that are thread-safe, such as a function
that sets a threading.Event
or pushes to a queue.Queue
.
Footnotes