Asyncio Finally Got Peewee

pwasyncio

I feel that it is high time that Peewee had an assyncio story. I've avoided this for years. Database operations and transactions, while needing to support some degree of concurrency, are still rather heavy operations. Postgres default config specifies a max of 100 connections, for example, and SQLite doesn't even support concurrent writers. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee is still a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.

Rather than rewriting Peewee to be async-first or building a separate coroutine-based implementation, playhouse.pwasyncio instead provides a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop. This is accomplished using greenlet, the exact same approach used by SQLAlchemy. When database I/O occurs inside the Peewee-managed greenlet, control is yielded to the event loop until the async driver completes its operation. This makes it possible to use existing synchronous code and rely on Peewee's async bridging to handle yielding to the loop whenever queries are executed.

To get started use one of the Async database implementations. Typically this is the only thing you will need in order to use Peewee with asyncio:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('my_app.db')

class User(db.Model):
    name = TextField()

Queries must be executed through an async execution method. This ensures that when blocking would occur, control is properly yielded to the event loop. The database context (async with db) acquires a connection from the pool and releases it on exit:

async def main():
    async with db:
        await db.acreate_tables([User])

        # Create a new user in a transaction.
        async with db.atomic():
            user = await User.acreate(name='Huey')

        # Fetch a single row from the database.
        huey = await User.aget(User.name == 'Huey')
        assert huey.id == user.id

        # Update a row.
        huey.name = 'Huey-zai'
        await huey.asave()

        # Execute a query and iterate the buffered results.
        for user in await User.select().order_by(User.name).aexecute():
            print(user.name)

        # Stream results from the database (uses server-side cursors where
        # available).
        async for user in db.iterate(User.select().order_by(User.name)):
            print(user.name)

    await db.close_pool()

asyncio.run(main())

Models bound to an async database get a-prefixed coroutine versions of the usual row methods (acreate, aget, asave, adelete_instance, etc). These are the most direct way to read and write individual rows:

user = await User.acreate(name='Huey')
user.name = 'Huey-zai'
await user.asave()

huey = await User.aget(User.name == 'Huey-zai')
obj, created = await User.aget_or_create(name='Mickey')

await huey.adelete_instance()

The query-builder methods (select(), where(), and the rest) only build SQL, so they need no async variant. The one query method that performs the I/O, execute(), has a corresponding aexecute():

# SELECT returns a buffered result wrapper:
for user in await User.select().order_by(User.name).aexecute():
    print(user.name)

# Writes return their usual values.
pk = await User.insert(name='Huey').aexecute()
n = await User.update(name='Zaizee').where(User.name == 'Huey').aexecute()

# Returning works as expected.
update = PageCount.update(PageCount.count += 1).where(PageCount.url == url)
for row in await update.returning(PageCount).aexecute():
    print('%s has %s views.' % (row.url, row.count))

The one sharp corner is lazy foreign-key access. Touching tweet.user outside the bridge fires a synchronous query and raises MissingGreenletBridge. Select the relation up front, eagerly load it, or load it explicitly with await tweet.afetch(Tweet.user), which fetches the related row and caches it on the instance so later attribute access is free.

For arbitrary synchronous code, the most general way to make database operations non-blocking is to write your code synchronously and pass a callable to the run() method, e.g.:

# Single operation:
user = await db.run(User.create, name='Alice')

# Multi-step function:
def register(username, bio):
    with db.atomic():
        user = User.create(name=username)
        Profile.create(user=user, bio=bio)
        return user

user = await db.run(register, 'alice', 'Python developer')

db.run() is recommended when:

Additionally, you can use the various async helpers:

# Execute any query and get its natural return type.
spammers = (User.delete()
            .where(User.name.contains('billing'))
            .returning(User.name))
for spammer in await db.aexecute(spammers):
    print('Deleted', spammer.name)

# db.aexecute(query) and query.aexecute() are equivalent:
recent = await User.select().order_by(User.id.desc()).aexecute()

# Use a transaction:
async with db.atomic() as tx:
    await db.run(User.create, name='Bob')

# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))

# Or use the model method:
user = await User.aget(User.name == 'Alice')

# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))

# SELECT and stream results from the database asynchronously.
users = [user async for user in db.iterate(User.select())]

# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT(User.id)))

# Or use the shortcut.
count = await db.count(User.select())

# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])

# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall())   # [(1,)]

For more details, see the documentation.

FastAPI example

Peewee's asyncio integration can be used with async-first frameworks like FastAPI.

The following example demonstrates how to:

from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from peewee import *
from playhouse.pwasyncio import *


db = AsyncPostgresqlDatabase('peewee_test')

class User(db.Model):
    username = TextField()

async def get_db():
    async with db:
        yield db

@asynccontextmanager
async def lifespan(app):
    async with db:
        await db.acreate_tables([User])
    yield
    await db.close_pool()

app = FastAPI(lifespan=lifespan)

@app.get('/users')
async def list_users(db=Depends(get_db)):
    return await db.list(User.select().dicts())

Peewee also comes with pydantic support - see the example for more details.

How does it work?

Synchronous ORM code runs inside a greenlet, and async I/O is bridged internally by Peewee using two helpers, greenlet_spawn() and await_(). The greenlet_spawn() helper runs synchronous code inside a greenlet, and can be suspended and resumed in order to yield to the asyncio event loop. Yielding is done by the await_() helper, which suspends the greenlet and passes control to the asyncio coroutine. View the code to see the full implementation.

Here's a small example showing the execution path and how context switches between sync and async worlds:

import asyncio
from playhouse.pwasyncio import *

# Synchronous function gets run by the greenlet helper.
def synchronous():
    return await_(a_add(1, 2))

# Normal async function.
async def a_add(x, y):
    await asyncio.sleep(1)
    return x + y

async def main():
    result = await greenlet_spawn(synchronous)
    print(result)

asyncio.run(main())  # After 1 second, prints 3

When this runs:

  1. main() is declared async def and runs in the "async" world. It awaits greenlet_spawn, passing in our sync callable.
  2. Inside greenlet_spawn we create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. This is the link between sync and async python. Inside the greenlet everything is synchronous, but it can yield coroutines to the parent (async), which then runs them on the loop.
  3. Inside the greenlet we begin executing our synchronous code - it does NOT know anything about asyncio. Eventually it hits await_(a_add(1, 2)). When we call a_add(1, 2) Python gives us a coroutine, which gets passed to await_().
  4. Inside the await_() helper we switch contexts back to the parent "async" world, passing our a_add() coroutine.
  5. Back in the async world, we await the coroutine. The event loop runs a_add() and gives us back the result.
  6. The async greenlet_spawn() helper then passes that result back into the synchronous greenlet as the return value of our await_() call. Synchronous execution resumes and returns the result.
  7. At this point the greenlet running our synchronous code has finished. greenlet_spawn() now finishes and returns the result (3) to main(), which gets printed.

The skeptical can verify that our synchronous callable is running asynchronously:

async def run_several():
    tasks = [greenlet_spawn(synchronous) for i in range(10)]
    print(await asyncio.gather(*tasks))

import time
start = time.perf_counter()
asyncio.run(run_several())
print(time.perf_counter() - start)  # 1.00...

In your code you should never need to use greenlet_spawn() or await_() directly. Instead use the run() method, which ensures synchronous code does not block during query executions:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    def work():
        print(db.execute_sql('select 1').fetchall())
    await db.run(work)

asyncio.run(demo())  # prints [(1,)]

As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    curs = await db.aexecute_sql('select 1')
    print(curs.fetchall())

asyncio.run(demo())  # prints [(1,)]

Note that obtaining the results from the cursor does not happen asynchronously in this example (e.g. we do not call print(await curs.fetchall())). Internally Peewee does await fetching the results from the cursor, but the rows are all loaded before the cursor is returned to the caller. This ensures consistency with existing behavior. For server-side cursors with async result streaming, see db.iterate().

Future

Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though, as that's the spirit of Peewee. Since this was first written the API has already grown a little, gaining a-prefixed model methods and an aexecute() method on queries, and the asyncio extension is no longer considered preliminary.

Links:

Comments (0)


Commenting has been closed.