Asyncio Finally Got Peewee
Which way I fly is hell; myself am hell;
And in the lowest deep a lower deep
Still threat'ning to devour me opens wide,
To which the hell I suffer seems a heav'n.
I feel that it is high time that Peewee had an assyncio story. I've avoided this for years because asyncio is, in my opinion, a lousy design for anything beyond massively concurrent I/O multiplexing. Database operations and transactions, while needing to support some degree of concurrency (Sqlite doesn't even manage that), are still rather heavy operations - Postgres default config specifies a max of 100 connections. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee remains a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.
That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.
While I still prefer threads or gevent, the
implementation in playhouse/pwasyncio.py presents a fairly manageable surface
area (at time of writing under 500 lines of code). Over time I'm sure this will
grow, but this is a beginning at any rate.
The guiding principle of the new asyncio layer is that it doesn't get to affect
the ORM. Queries, working with models, transactions, and resolving
relationships all continue to behave the way they always have. Asyncio happens
only during calls to execute_sql(), the lowest-common-denominator for
operations in Peewee. To accomplish this, I've borrowed the same approach
SQLAlchemy is using. It uses greenlet (not gevent!) to wrap
synchronous code in something we can suspend and resume when blocking occurs.
Control can then be passed back-and-forth between our greenlet and the asyncio
event loop without needing to rewrite everything top-to-bottom.
This design makes it so that, rather than rewriting Peewee to be async-friendly, we instead have a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop and programming model.
This is a barebones example using the run() helper, the primary entry-point
for asyncio code:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('example.db')
class User(db.Model):
name = TextField()
async def demo():
async with db:
await db.run(db.create_tables, [User])
await db.run(User.create, name='Charlie')
def get_user():
return User.select().where(User.name == 'Charlie').get()
user = await db.run(get_user)
def bulk_insert():
with db.atomic():
iq = (User
.insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
.returning(User))
users = iq.execute()
print('Added users: %s' % list(users))
await db.run(bulk_insert)
users = await db.run(list, User.select().order_by(User.name))
for user in users:
print(user.name)
# Close the pool - the connection was released, but it still remains inside
# the pool, so this ensures we are ready to shutdown completely.
await db.close_pool()
asyncio.run(demo())
As you can see, Peewee still looks pretty much synchronous, and everything is passed into the asyncio bridge. If that isn't to your taste, though, here is the same example, but using a few asyncio helper methods:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('example.db')
class User(db.Model):
name = TextField()
async def demo():
async with db:
await db.acreate_tables([User])
user = await db.run(User.create, name='Charlie')
user_db = await db.get(User.select().where(User.name == 'Charlie'))
assert user.name == user_db.name == 'Charlie'
async with db.atomic():
iq = (User
.insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
.returning(User))
users = await db.aexecute(iq)
print('Added users: %s' % list(users))
for user in await db.list(User.select().order_by(User.name)):
print(user.name)
await db.close_pool()
asyncio.run(demo())
When running Peewee ORM code, you can choose between the two execution patterns
depending on how explicit you want to be. The most general-purpose method is to
write your code synchronously and pass a callable to the run() method, e.g.:
def create_user():
return User.create(username='huey')
user1 = await db.run(create_user)
user2 = await db.run(User.create, username='mickey')
Alternately, you can use the various async helpers where appropriate:
users = await db.list(User.select())
count = await db.scalar(User.select(fn.COUNT(User.id)))
user = await db.get(User.select().where(User.id == 1))
pk = await db.aexecute(User.insert(name='Huey'))
How does it work?
Synchronous ORM code runs inside a greenlet, and async I/O is bridged
explicitly by Peewee using two helpers, greenlet_spawn() and await_(). The
greenlet_spawn() helper runs synchronous code, but can be suspended and
resumed in order to yield to the asyncio event loop. Yielding is done by the
await_() helper, which suspends the greenlet and passes control to the
asyncio coroutine.
Peewee wraps all this up in a general-purpose run() method, which is the
entrypoint for pretty much all async operations:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
def work():
print(db.execute_sql('select 1').fetchall())
await db.run(work)
asyncio.run(demo()) # prints [(1,)]
The basic flow goes something like this:
- The above code eventually hits the
db.run()method. This method calls thegreenlet_spawn()function, creating a resumable coroutine wrapping our synchronous code. - The greenlet begins executing the synchronous Peewee code.
- We call
db.execute_sql('select 1') - The async database implementation calls our special
await_()helper, which switches control back to the event loop. - The event-loop awaits the coroutine, e.g.
await conn.execute(...), awaiting the results from the cursor before handing them back. - The result cursor is sent back to the greenlet, and the greenlet resumes.
db.execute_sql()returns and the rest of the code continues normally.- We call
fetchall()on the result cursor, which returns all the rows loaded during (5).
If we try to run execute_sql() outside of the greenlet helper, Peewee will
raise an exception:
async def demo():
db = AsyncSqliteDatabase(':memory:')
print(db.execute_sql('select 1').fetchall())
# raises RuntimeError: await_() called outside greenlet_spawn()
asyncio.run(demo())
Peewee provides a number of async-ready helper methods for common operations,
so the run() helper can be avoided:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
curs = await db.aexecute_sql('select 1')
print(curs.fetchall())
asyncio.run(demo()) # prints [(1,)]
Note that obtaining the results from the cursor does not happen asynchronously
(e.g. we do not call print(await curs.fetchall())). Internally Peewee does
await fetching the results from the cursor, but the rows are all loaded before
the cursor is returned to the caller. This ensures consistency with existing
behavior, though in future versions we may add support for streaming cursor
results (via Postgres server-side cursors).
Connections and Transactions
State management in Peewee is handled by a thread local (or greenlet-local if you're using gevent) - this ensures that separate threads own their own connection and transaction state. In asyncio, particularly when mixed with the greenlet helper, neither of these options is available. Instead, Peewee uses a task-local that ensures each asyncio task has its own connection and transaction stack. This nothing-shared architecture hopefully makes working with async Peewee easier, though there is an implied limit depending on the size of your connection pool and how quickly you release connections back into it.
Transactions continue to work the same whether you're inside a coroutine or not:
async def create_users_async():
async with db.atomic():
await db.run(User.create, name='u1')
await db.run(User.create, name='u2')
def create_users_sync():
with db.atomic():
User.create(name='u3')
User.create(name='u4')
await create_users_async()
await db.run(create_users_sync)
Caveats
There are limitations to what can be achieved with this approach. The main one I foresee causing problems is lazy foreign-key resolution. Consider this example:
tweet = await db.get(Tweet.select())
print(tweet.user.name) # Fails.
# RuntimeError: await_() called outside greenlet_spawn()
This fails because the relationship tweet.user was not explicitly fetched, so
Peewee attempts to issue a SELECT query to get the related user. This fails
because we are not operating inside the greenlet-bridged environment.
One solution is to resolve foreign keys inside db.run():
print(await db.run(lambda: tweet.user.name))
Even better is to select the related object explicitly:
query = Tweet.select(Tweet, User).join(User)
tweet = await db.get(query)
print(tweet.user.name) # OK, no extra SELECT required.
In a similar way, iterating the related objects requires a query:
for tweet in user.tweet_set:
print(tweet.message)
# RuntimeError: await_() called outside greenlet_spawn()
Like above, there are a few ways you can accomplish this:
# Use the db.run() helper:
tweets = await db.run(list, user.tweet_set)
for tweet in tweets:
print(tweet.message)
# Use the db.list() helper:
for tweet in await db.list(user.tweet_set):
print(tweet.message)
# Use prefetch (not a great fit, but just to demonstrate):
user_query = User.select().where(User.id == user.id)
tweet_query = Tweet.select()
user, = await db.run(prefetch, user_query, tweet_query)
for tweet in user.tweet_set:
print(tweet.message)
Future
Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though - it's the spirit of Peewee, after all.
Links:
- Documentation
- Code - currently in
masterbranch but will be released soonish. - Mike Bayer's excellent post on asyncio and databases, which is worth a read for anyone pondering the utility or futility of asyncio and ORMs.
- My own thoughts on asyncio from a couple years ago
