Asyncio Finally Got Peewee
Which way I fly is hell; myself am hell;
And in the lowest deep a lower deep
Still threat'ning to devour me opens wide,
To which the hell I suffer seems a heav'n.
I feel that it is high time that Peewee had an assyncio story. I've avoided this for years because asyncio is, in my opinion, a lousy design for anything beyond massively concurrent I/O multiplexing. Database operations and transactions, while needing to support some degree of concurrency (Sqlite doesn't even manage that), are still rather heavy operations - Postgres default config specifies a max of 100 connections. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee remains a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.
That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.
While I still prefer threads or gevent, the
implementation in playhouse/pwasyncio.py presents a fairly manageable surface
area (at time of writing under 500 lines of code). Over time I'm sure this will
grow, but this is a beginning at any rate.
The main point is that the asyncio implementation doesn't affect
the ORM. Query objects, working with models, transactions, and resolving
relationships all continue to behave the way they always have. Asyncio happens
only during calls to execute_sql(), the lowest-common-denominator for
operations in Peewee. To accomplish this, I've borrowed the same approach
SQLAlchemy is using. It uses greenlet (not gevent!) to wrap
synchronous code in something we can suspend and resume when blocking occurs.
Control can then be passed back-and-forth between our greenlet and the asyncio
event loop without needing to rewrite everything top-to-bottom.
This design makes it so that, rather than rewriting Peewee to be async-friendly, we instead have a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop and programming model.
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('example.db')
# Normal Peewee model definition, nothing special.
class User(db.Model):
name = TextField()
async def demo():
async with db:
# Asynchronously create table(s).
await db.acreate_tables([User])
# Create a new user.
user = await db.run(User.create, name='Charlie')
# Retrieve new user from the database.
user_db = await db.get(User.select().where(User.name == 'Charlie'))
assert user.name == user_db.name == 'Charlie'
# Atomicity with async context managers.
async with db.atomic():
# Construct a normal Peewee INSERT query.
iq = (User
.insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
.returning(User))
# Execute query asynchronously, retrieving results.
users = await db.aexecute(iq)
print('Added users: %s' % list(users))
# Retrieve list of users from database.
for user in await db.list(User.select().order_by(User.name)):
print(user.name)
await db.close_pool()
asyncio.run(demo())
When running Peewee ORM code, the most general-purpose method to make things non-blocking is to write your code synchronously and pass a callable to the run() method, e.g.:
def do_database_stuff():
# Any normal, synchronous Peewee code.
user = User.create(username='huey')
for i in range(3):
Tweet.create(user=user, content='meow %d' % i)
return user
# All queries will be executed on the event-loop!
huey = await db.run(do_database_stuff)
Additionally, you can use the various async helpers:
users = await db.list(User.select())
count = await db.scalar(User.select(fn.COUNT(User.id)))
user = await db.get(User.select().where(User.id == 1))
pk = await db.aexecute(User.insert(name='Huey'))
At the time of writing, the following async helpers are available on the Database objects:
aconnect()- acquire a connection from the pool for the current task.aclose()- release the connection back to the pool.close_pool()- close the connection pool.atomic()- can be used as an async context manager for created arbitrarily nested transactions.get()- get a single model instance or result.list()- get the results of a query (0…many).scalar()- get a single, scalar value from a query.aexecute()- execute a Query object.aexecute_sql()- execute a SQL query.acreate_tables()- create one or more tables.adrop_tables()- drop one or more tables.
For more details, see the documentation.
How does it work?
Synchronous ORM code runs inside a greenlet, and async I/O is bridged
explicitly by Peewee using two helpers, greenlet_spawn() and await_(). The
greenlet_spawn() helper runs synchronous code, but can be suspended and
resumed in order to yield to the asyncio event loop. Yielding is done by the
await_() helper, which suspends the greenlet and passes control to the
asyncio coroutine. View the code to see the full implementation.
Here's a small example showing the execution path and how context switches between sync and async worlds:
import asyncio
from playhouse.pwasyncio import *
# Synchronous function gets run by the greenlet helper.
def synchronous():
return await_(a_add(1, 2))
# Normal async function.
async def a_add(x, y):
await asyncio.sleep(1)
return x + y
async def main():
result = await greenlet_spawn(synchronous)
print(result)
asyncio.run(main()) # After 1 second, prints 3
When this runs:
main()is declaredasync defand runs in the "async" world. It awaitsgreenlet_spawn, passing in our sync callable.- Inside
greenlet_spawnwe create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. - The greenlet begins executing our synchronous code - it does NOT know anything about asyncio. It hits
await_(a_add(1, 2)). When we calla_add(1, 2)Python gives us a coroutine, which gets passed toawait_(). - Inside the
await_()helper we switch contexts back to the parent "async" world, passing oura_add()coroutine. - Back in the async world, we
awaitthe coroutine. The event loop runsa_add()and gives us back the result. - The
greenlet_spawn()helper then passes that result back into the synchronous world as the return value of ourawait_()call. Synchronous execution resumes and returns the result. - At this point the greenlet running our synchronous code has finished.
greenlet_spawn()now finishes and returns the result (3) tomain(), which gets printed.
In your code you should never need to use greenlet_spawn() or await_() directly, however. Peewee provides a general-purpose run() method which ensures synchronous Peewee code does not block during query executions:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
def work():
print(db.execute_sql('select 1').fetchall())
await db.run(work)
asyncio.run(demo()) # prints [(1,)]
As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
curs = await db.aexecute_sql('select 1')
print(curs.fetchall())
asyncio.run(demo()) # prints [(1,)]
Note that obtaining the results from the cursor does not happen asynchronously
(e.g. we do not call print(await curs.fetchall())). Internally Peewee does
await fetching the results from the cursor, but the rows are all loaded before
the cursor is returned to the caller. This ensures consistency with existing
behavior, though in future versions we may add support for streaming cursor
results (via Postgres server-side cursors).
Connections and Transactions
State management in Peewee is handled by a thread local (or greenlet-local if you're using gevent) - this ensures that separate threads own their own connection and transaction state. In asyncio, particularly when mixed with the greenlet helper, neither of these options is available. Instead, Peewee uses a task-local that ensures each asyncio task has its own connection and transaction stack. This nothing-shared architecture hopefully makes working with async Peewee easier, though there is an implied limit depending on the size of your connection pool and how quickly you release connections back into it.
Transactions continue to work the same whether you're inside a coroutine or not:
async def create_users_async():
async with db.atomic():
await db.run(User.create, name='u1')
await db.run(User.create, name='u2')
def create_users_sync():
with db.atomic():
User.create(name='u3')
User.create(name='u4')
await create_users_async()
await db.run(create_users_sync)
Caveats
There are limitations to what can be achieved with this approach. The main one I foresee causing problems is lazy foreign-key resolution. Consider this example:
tweet = await db.get(Tweet.select())
print(tweet.user.name) # Fails.
# MissingGreenletBridge: Attempted query SELECT ...outside greenlet runner.
This fails because the relationship tweet.user was not explicitly fetched, so
Peewee attempts to issue a SELECT query to get the related user. This fails
because we are not operating inside the greenlet-bridged environment.
One solution is to resolve foreign keys inside db.run():
print(await db.run(lambda: tweet.user.name))
Even better is to select the related object explicitly:
query = Tweet.select(Tweet, User).join(User)
tweet = await db.get(query)
print(tweet.user.name) # OK, no extra SELECT required.
In a similar way, iterating the related objects requires a query:
for tweet in user.tweet_set:
print(tweet.message)
# MissingGreenletBridge: Attempted query SELECT ... outside greenlet runner.
Like above, there are a few ways you can accomplish this:
# Use the db.run() helper:
tweets = await db.run(list, user.tweet_set)
for tweet in tweets:
print(tweet.message)
# Use the db.list() helper:
for tweet in await db.list(user.tweet_set):
print(tweet.message)
# Use prefetch (not a great fit, but just to demonstrate):
user_query = User.select().where(User.id == user.id)
tweet_query = Tweet.select()
user, = await db.run(prefetch, user_query, tweet_query)
for tweet in user.tweet_set:
print(tweet.message)
Future
Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though - it's the spirit of Peewee, after all.
Links:
- Documentation
- Code - currently in
masterbranch but will be released soonish. - Mike Bayer's excellent post on asyncio and databases, which is worth a read for anyone pondering the utility or futility of asyncio and ORMs.
- My own thoughts on asyncio from a couple years ago
Comments (0)
Commenting has been closed.
