Asyncio Finally Got Peewee
I feel that it is high time that Peewee had an assyncio story. I've avoided this for years. Database operations and transactions, while needing to support some degree of concurrency, are still rather heavy operations. Postgres default config specifies a max of 100 connections, for example, and SQLite doesn't even support concurrent writers. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee is still a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.
Rather than rewriting Peewee to be async-first or building a separate coroutine-based implementation, playhouse.pwasyncio instead provides a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop. This is accomplished using greenlet, the exact same approach used by SQLAlchemy. When database I/O occurs inside the Peewee-managed greenlet, control is yielded to the event loop until the async driver completes its operation. This makes it possible to use existing synchronous code and rely on Peewee's async bridging to handle yielding to the loop whenever queries are executed.
To get started use one of the Async database implementations. Typically this is the only thing you will need in order to use Peewee with asyncio:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('my_app.db')
class User(db.Model):
name = TextField()
Queries must be executed through an async execution method. This ensures that when blocking would occur, control is properly yielded to the event loop. The database context (async with db) acquires a connection from the pool and releases it on exit:
async def main():
async with db:
await db.acreate_tables([User])
# Create a new user in a transaction.
async with db.atomic():
user = await User.acreate(name='Huey')
# Fetch a single row from the database.
huey = await User.aget(User.name == 'Huey')
assert huey.id == user.id
# Update a row.
huey.name = 'Huey-zai'
await huey.asave()
# Execute a query and iterate the buffered results.
for user in await User.select().order_by(User.name).aexecute():
print(user.name)
# Stream results from the database (uses server-side cursors where
# available).
async for user in db.iterate(User.select().order_by(User.name)):
print(user.name)
await db.close_pool()
asyncio.run(main())
Models bound to an async database get a-prefixed coroutine versions of the usual row methods (acreate, aget, asave, adelete_instance, etc). These are the most direct way to read and write individual rows:
user = await User.acreate(name='Huey')
user.name = 'Huey-zai'
await user.asave()
huey = await User.aget(User.name == 'Huey-zai')
obj, created = await User.aget_or_create(name='Mickey')
await huey.adelete_instance()
The query-builder methods (select(), where(), and the rest) only build SQL, so they need no async variant. The one query method that performs the I/O, execute(), has a corresponding aexecute():
# SELECT returns a buffered result wrapper:
for user in await User.select().order_by(User.name).aexecute():
print(user.name)
# Writes return their usual values.
pk = await User.insert(name='Huey').aexecute()
n = await User.update(name='Zaizee').where(User.name == 'Huey').aexecute()
# Returning works as expected.
update = PageCount.update(PageCount.count += 1).where(PageCount.url == url)
for row in await update.returning(PageCount).aexecute():
print('%s has %s views.' % (row.url, row.count))
The one sharp corner is lazy foreign-key access. Touching tweet.user outside the bridge fires a synchronous query and raises MissingGreenletBridge. Select the relation up front, eagerly load it, or load it explicitly with await tweet.afetch(Tweet.user), which fetches the related row and caches it on the instance so later attribute access is free.
For arbitrary synchronous code, the most general way to make database operations non-blocking is to write your code synchronously and pass a callable to the run() method, e.g.:
# Single operation:
user = await db.run(User.create, name='Alice')
# Multi-step function:
def register(username, bio):
with db.atomic():
user = User.create(name=username)
Profile.create(user=user, bio=bio)
return user
user = await db.run(register, 'alice', 'Python developer')
db.run() is recommended when:
- You have existing synchronous code you want to call from async.
- A single operation involves multiple queries (e.g. a transaction).
Additionally, you can use the various async helpers:
# Execute any query and get its natural return type.
spammers = (User.delete()
.where(User.name.contains('billing'))
.returning(User.name))
for spammer in await db.aexecute(spammers):
print('Deleted', spammer.name)
# db.aexecute(query) and query.aexecute() are equivalent:
recent = await User.select().order_by(User.id.desc()).aexecute()
# Use a transaction:
async with db.atomic() as tx:
await db.run(User.create, name='Bob')
# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))
# Or use the model method:
user = await User.aget(User.name == 'Alice')
# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))
# SELECT and stream results from the database asynchronously.
users = [user async for user in db.iterate(User.select())]
# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT(User.id)))
# Or use the shortcut.
count = await db.count(User.select())
# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])
# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall()) # [(1,)]
For more details, see the documentation.
FastAPI example
Peewee's asyncio integration can be used with async-first frameworks like FastAPI.
The following example demonstrates how to:
- Ensure connection is opened and closed for each request.
- Create tables/resources when app server starts.
- Shut-down connection pool when app server exits.
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from peewee import *
from playhouse.pwasyncio import *
db = AsyncPostgresqlDatabase('peewee_test')
class User(db.Model):
username = TextField()
async def get_db():
async with db:
yield db
@asynccontextmanager
async def lifespan(app):
async with db:
await db.acreate_tables([User])
yield
await db.close_pool()
app = FastAPI(lifespan=lifespan)
@app.get('/users')
async def list_users(db=Depends(get_db)):
return await db.list(User.select().dicts())
Peewee also comes with pydantic support - see the example for more details.
How does it work?
Synchronous ORM code runs inside a greenlet, and async I/O is bridged internally by Peewee using two helpers, greenlet_spawn() and await_(). The
greenlet_spawn() helper runs synchronous code inside a greenlet, and can be suspended and
resumed in order to yield to the asyncio event loop. Yielding is done by the
await_() helper, which suspends the greenlet and passes control to the
asyncio coroutine. View the code to see the full implementation.
Here's a small example showing the execution path and how context switches between sync and async worlds:
import asyncio
from playhouse.pwasyncio import *
# Synchronous function gets run by the greenlet helper.
def synchronous():
return await_(a_add(1, 2))
# Normal async function.
async def a_add(x, y):
await asyncio.sleep(1)
return x + y
async def main():
result = await greenlet_spawn(synchronous)
print(result)
asyncio.run(main()) # After 1 second, prints 3
When this runs:
main()is declaredasync defand runs in the "async" world. It awaitsgreenlet_spawn, passing in our sync callable.- Inside
greenlet_spawnwe create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. This is the link between sync and async python. Inside the greenlet everything is synchronous, but it can yield coroutines to the parent (async), which then runs them on the loop. - Inside the greenlet we begin executing our synchronous code - it does NOT know anything about asyncio. Eventually it hits
await_(a_add(1, 2)). When we calla_add(1, 2)Python gives us a coroutine, which gets passed toawait_(). - Inside the
await_()helper we switch contexts back to the parent "async" world, passing oura_add()coroutine. - Back in the async world, we
awaitthe coroutine. The event loop runsa_add()and gives us back the result. - The async
greenlet_spawn()helper then passes that result back into the synchronous greenlet as the return value of ourawait_()call. Synchronous execution resumes and returns the result. - At this point the greenlet running our synchronous code has finished.
greenlet_spawn()now finishes and returns the result (3) tomain(), which gets printed.
The skeptical can verify that our synchronous callable is running asynchronously:
async def run_several():
tasks = [greenlet_spawn(synchronous) for i in range(10)]
print(await asyncio.gather(*tasks))
import time
start = time.perf_counter()
asyncio.run(run_several())
print(time.perf_counter() - start) # 1.00...
In your code you should never need to use greenlet_spawn() or await_() directly. Instead use the run() method, which ensures synchronous code does not block during query executions:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
def work():
print(db.execute_sql('select 1').fetchall())
await db.run(work)
asyncio.run(demo()) # prints [(1,)]
As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
curs = await db.aexecute_sql('select 1')
print(curs.fetchall())
asyncio.run(demo()) # prints [(1,)]
Note that obtaining the results from the cursor does not happen asynchronously in this example
(e.g. we do not call print(await curs.fetchall())). Internally Peewee does
await fetching the results from the cursor, but the rows are all loaded before
the cursor is returned to the caller. This ensures consistency with existing
behavior. For server-side cursors with async result streaming, see db.iterate().
Future
Over time, the surface of the asyncio API will probably grow to encompass more
use-cases where an async helper would make sense. My desire was to start small,
though, as that's the spirit of Peewee. Since this was first written the API has
already grown a little, gaining a-prefixed model methods and an aexecute()
method on queries, and the asyncio extension is no longer considered
preliminary.
Links:
- Documentation
- Code - available on PyPI, stable as of 4.0.8.
- Mike Bayer's excellent post on asyncio and databases, which is worth a read for anyone pondering the utility or futility of asyncio and ORMs.
- My own thoughts on asyncio from a couple years ago
Comments (0)
Commenting has been closed.
