Asyncio Finally Got Peewee
Which way I fly is hell; myself am hell;
And in the lowest deep a lower deep
Still threat'ning to devour me opens wide,
To which the hell I suffer seems a heav'n.
I feel that it is high time that Peewee had an assyncio story. I've avoided this for years. Database operations and transactions, while needing to support some degree of concurrency, are still rather heavy operations. Postgres default config specifies a max of 100 connections, for example, and SQLite doesn't even support concurrent writers. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee is still a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.
That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.
Rather than rewriting Peewee to be async-first or building a separate coroutine-based implementation, playhouse.pwasyncio instead provides a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop. This is accomplished using greenlet, the same approach used by SQLAlchemy. When database I/O occurs inside the Peewee-managed greenlet, control is yielded to the event loop until the async driver completes its operation. This makes it possible to use existing synchronous code and rely on Peewee's async bridging to handle yielding to the loop whenever queries are executed.
To get started use one of the Async database implementations. Typically this is the only thing you will need in order to use Peewee with asyncio:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('my_app.db')
class User(db.Model):
name = TextField()
Queries must be executed through an async execution method. This ensures that when blocking would occur, control is properly yielded to the event loop. The database context (async with db) acquires a connection from the pool and releases it on exit:
async def main():
async with db:
await db.acreate_tables([User])
# Create a new user in a transaction.
async with db.atomic() as txn:
user = await db.run(User.create, name='Huey')
# Fetch a single row from the database.
huey = await db.get(User.select().where(User.name == 'Huey'))
assert huey.name == user.name
# Execute a query and iterate results.
for user in await db.list(User.select().order_by(User.name)):
print(user.name)
# Async lazy result fetching (uses server-side cursors where
# available).
query = User.select().order_by(User.name)
async for user in db.iterate(query):
print(user.name)
await db.close_pool()
asyncio.run(main())
The most general way to make database operations non-blocking with Peewee is to write your code synchronously and pass a callable to the run() method, e.g.:
# Single operation:
user = await db.run(User.create, name='Alice')
# Multi-step function:
def register(username, bio):
with db.atomic():
user = User.create(name=username)
Profile.create(user=user, bio=bio)
return user
user = await db.run(register, 'alice', 'Python developer')
db.run() is recommended when:
- You have existing synchronous code you want to call from async.
- A single operation involves multiple queries (e.g. a transaction).
Additionally, you can use the various async helpers:
# Execute any query and get its natural return type.
query = (User.delete()
.where(User.username.contains('billing'))
.returning(User.username))
for spammer in await db.aexecute(query):
print('Deleted', spammer.username)
# Use a transaction:
async with db.atomic() as tx:
await db.run(User.create, name='Bob')
# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))
# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))
# SELECT and stream results from the database asynchronously.
users = [user async for user in db.iterate(User.select())]
# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT(User.id)))
# Or user shortcut.
count = await db.count(User.select())
# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])
# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall()) # [(1,)]
For more details, see the documentation.
FastAPI example
Peewee's asyncio integration can be used with async-first frameworks like FastAPI.
The following example demonstrates how to:
- Ensure connection is opened and closed for each request.
- Create tables/resources when app server starts.
- Shut-down connection pool when app server exits.
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from peewee import *
from playhouse.pwasyncio import *
app = FastAPI()
db = AsyncPostgresqlDatabase('peewee_test')
class User(db.Model):
username = TextField()
async def get_db():
async with db:
yield db
@asynccontextmanager
async def lifespan(app):
async with db:
await db.acreate_tables([User])
yield
await db.close_pool()
app = FastAPI(lifespan=lifespan)
@app.get('/users')
async def list_users(db=Depends(get_db)):
return await db.list(User.select().dicts())
Peewee also comes with pydantic support - see the example for more details.
How does it work?
Synchronous ORM code runs inside a greenlet, and async I/O is bridged internally by Peewee using two helpers, greenlet_spawn() and await_(). The
greenlet_spawn() helper runs synchronous code inside a greenlet, and can be suspended and
resumed in order to yield to the asyncio event loop. Yielding is done by the
await_() helper, which suspends the greenlet and passes control to the
asyncio coroutine. View the code to see the full implementation.
Here's a small example showing the execution path and how context switches between sync and async worlds:
import asyncio
from playhouse.pwasyncio import *
# Synchronous function gets run by the greenlet helper.
def synchronous():
return await_(a_add(1, 2))
# Normal async function.
async def a_add(x, y):
await asyncio.sleep(1)
return x + y
async def main():
result = await greenlet_spawn(synchronous)
print(result)
asyncio.run(main()) # After 1 second, prints 3
When this runs:
main()is declaredasync defand runs in the "async" world. It awaitsgreenlet_spawn, passing in our sync callable.- Inside
greenlet_spawnwe create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. This is the link between sync and async. Inside the greenlet everything is synchronous, but it can yield coroutines to the parent (async), which then runs them on the loop. - Inside the greenlet we begin executing our synchronous code - it does NOT know anything about asyncio. Eventually it hits
await_(a_add(1, 2)). When we calla_add(1, 2)Python gives us a coroutine, which gets passed toawait_(). - Inside the
await_()helper we switch contexts back to the parent "async" world, passing oura_add()coroutine. - Back in the async world, we
awaitthe coroutine. The event loop runsa_add()and gives us back the result. - The async
greenlet_spawn()helper then passes that result back into the synchronous greenlet as the return value of ourawait_()call. Synchronous execution resumes and returns the result. - At this point the greenlet running our synchronous code has finished.
greenlet_spawn()now finishes and returns the result (3) tomain(), which gets printed.
The skeptical can verify that our synchronous callable is running asynchronously:
async def run_several():
tasks = [greenlet_spawn(synchronous) for i in range(10)]
print(await asyncio.gather(*tasks))
import time
start = time.perf_counter()
asyncio.run(run_several())
print(time.perf_counter() - start) # 1.00...
In your code you should never need to use greenlet_spawn() or await_() directly. Instead use the run() method, which ensures synchronous code does not block during query executions:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
def work():
print(db.execute_sql('select 1').fetchall())
await db.run(work)
asyncio.run(demo()) # prints [(1,)]
As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
curs = await db.aexecute_sql('select 1')
print(curs.fetchall())
asyncio.run(demo()) # prints [(1,)]
Note that obtaining the results from the cursor does not happen asynchronously in this example
(e.g. we do not call print(await curs.fetchall())). Internally Peewee does
await fetching the results from the cursor, but the rows are all loaded before
the cursor is returned to the caller. This ensures consistency with existing
behavior. For server-side cursors with async result streaming, see db.iterate().
Future
Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though, as that's the spirit of Peewee.
Links:
- Documentation
- Code - currently in
masterbranch but will be released soonish. - Mike Bayer's excellent post on asyncio and databases, which is worth a read for anyone pondering the utility or futility of asyncio and ORMs.
- My own thoughts on asyncio from a couple years ago
Comments (0)
Commenting has been closed.
