Asyncio Finally Got Peewee
Which way I fly is hell; myself am hell;
And in the lowest deep a lower deep
Still threat'ning to devour me opens wide,
To which the hell I suffer seems a heav'n.
I feel that it is high time that Peewee had an assyncio story. I've avoided this for years. In my opinion, asyncio is a lousy design for anything beyond massively concurrent I/O multiplexing. Database operations and transactions, while needing to support some degree of concurrency (Sqlite doesn't even manage that), are still rather heavy operations - Postgres default config specifies a max of 100 connections. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee remains a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.
That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.
Peewee is a synchronous library by design. Rather than rewriting Peewee to be async-first or building an entire parallel coroutine-based implementation, we instead have a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop. This is accomplished using greenlet - the same approach used by SQLAlchemy. When database I/O occurs inside the Peewee-managed greenlet, control is yielded to the event loop until the async driver completes its operation. This allows you to use existing synchronous code and rely on Peewee's async bridging to handle yielding to the loop whenever queries are executed.
To get started use one of the Async database implementations:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('my_app.db')
class User(db.Model):
name = TextField()
All queries need to be executed through one of the async execution methods. The database context (async with db) acquires a connection from the pool and releases it on exit:
async def main():
async with db:
await db.acreate_tables([User])
user = await db.run(User.create, name='Huey')
huey = await db.get(User.select().where(User.name == 'Huey'))
assert huey.name == user.name
for user in await db.list(User.select().order_by(User.name)):
print(user.name)
await db.close_pool()
asyncio.run(main())
When running Peewee ORM code, the most general way to make things non-blocking is to write your code synchronously and pass a callable to the run() method, e.g.:
# Single operation:
user = await db.run(User.create, name='Alice')
# Multi-step function:
def register(username, bio):
with db.atomic():
user = User.create(name=username)
Profile.create(user=user, bio=bio)
return user
user = await db.run(register, 'alice', 'Python developer')
db.run() is recommended when:
- You have existing synchronous code you want to call from async.
- A single operation involves multiple queries (e.g. a transaction).
Additionally, you can use the various async helpers:
# Execute any query and get its natural return type.
cursor = await db.aexecute(query)
# Use a transaction:
async with db.atomic() as tx:
await db.run(User.create, name='Bob')
# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))
# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))
# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT('*')))
# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])
# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall()) # [(1,)]
For more details, see the documentation.
FastAPI example
Peewee's asyncio integration can be used with async-first frameworks like FastAPI.
The following example demonstrates how to:
- Ensure connection is opened and closed for each request.
- Create tables/resources when app server starts.
- Shut-down connection pool when app server exits.
from fastapi import FastAPI
from peewee import *
from playhouse.pwasyncio import *
app = FastAPI()
db = AsyncPostgresqlDatabase('peewee_test', host='10.8.0.1', user='postgres')
@app.middleware('http')
async def database_connection(request, call_next):
await db.aconnect() # Obtain connection from connection pool.
try:
response = await call_next(request)
finally:
await db.aclose() # Release connection back to pool.
return response
@app.on_event('startup')
async def on_startup():
async with db:
await db.acreate_tables([Model1, Model2, Model3, ...])
@app.on_event('shutdown')
async def on_shutdown():
await db.close_pool()
Here are some examples of executing an async query:
@app.get('/users')
async def list_users():
return await db.list(User.select().dicts())
@app.post('/users')
async def create_user(name: str):
async with db.atomic() as tx:
user = await db.run(User.create, name=name)
return {'id': user.id, 'name': user.name}
How does it work?
Synchronous ORM code runs inside a greenlet, and async I/O is bridged internally by Peewee using two helpers, greenlet_spawn() and await_(). The
greenlet_spawn() helper runs synchronous code, but can be suspended and
resumed in order to yield to the asyncio event loop. Yielding is done by the
await_() helper, which suspends the greenlet and passes control to the
asyncio coroutine. View the code to see the full implementation.
Here's a small example showing the execution path and how context switches between sync and async worlds:
import asyncio
from playhouse.pwasyncio import *
# Synchronous function gets run by the greenlet helper.
def synchronous():
return await_(a_add(1, 2))
# Normal async function.
async def a_add(x, y):
await asyncio.sleep(1)
return x + y
async def main():
result = await greenlet_spawn(synchronous)
print(result)
asyncio.run(main()) # After 1 second, prints 3
When this runs:
main()is declaredasync defand runs in the "async" world. It awaitsgreenlet_spawn, passing in our sync callable.- Inside
greenlet_spawnwe create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. This is the link between sync and async. Inside the greenlet everything is synchronous, but it can yield coroutines to the parent (async), which then runs them on the loop. - Inside the greenlet we begin executing our synchronous code - it does NOT know anything about asyncio. Eventually it hits
await_(a_add(1, 2)). When we calla_add(1, 2)Python gives us a coroutine, which gets passed toawait_(). - Inside the
await_()helper we switch contexts back to the parent "async" world, passing oura_add()coroutine. - Back in the async world, we
awaitthe coroutine. The event loop runsa_add()and gives us back the result. - The async
greenlet_spawn()helper then passes that result back into the synchronous greenlet as the return value of ourawait_()call. Synchronous execution resumes and returns the result. - At this point the greenlet running our synchronous code has finished.
greenlet_spawn()now finishes and returns the result (3) tomain(), which gets printed.
The skeptical can verify that our synchronous callable is running asynchronously:
async def run_several():
tasks = [greenlet_spawn(synchronous) for i in range(10)]
print(await asyncio.gather(*tasks))
import time
start = time.perf_counter()
asyncio.run(run_several())
print(time.perf_counter() - start) # 1.00...
In your code you should never need to use greenlet_spawn() or await_() directly, however. Peewee provides a general-purpose run() method which ensures synchronous Peewee code does not block during query executions:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
def work():
print(db.execute_sql('select 1').fetchall())
await db.run(work)
asyncio.run(demo()) # prints [(1,)]
As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:
from playhouse.pwasyncio import *
async def demo():
db = AsyncSqliteDatabase(':memory:')
curs = await db.aexecute_sql('select 1')
print(curs.fetchall())
asyncio.run(demo()) # prints [(1,)]
Note that obtaining the results from the cursor does not happen asynchronously
(e.g. we do not call print(await curs.fetchall())). Internally Peewee does
await fetching the results from the cursor, but the rows are all loaded before
the cursor is returned to the caller. This ensures consistency with existing
behavior, though in future versions we may add support for streaming cursor
results (via Postgres server-side cursors).
Future
Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though - it's the spirit of Peewee, after all.
Links:
- Documentation
- Code - currently in
masterbranch but will be released soonish. - Mike Bayer's excellent post on asyncio and databases, which is worth a read for anyone pondering the utility or futility of asyncio and ORMs.
- My own thoughts on asyncio from a couple years ago
Comments (0)
Commenting has been closed.
