Asyncio Finally Got Peewee

pwasyncio

Which way I fly is hell; myself am hell;

And in the lowest deep a lower deep

Still threat'ning to devour me opens wide,

To which the hell I suffer seems a heav'n.

I feel that it is high time that Peewee had an assyncio story. I've avoided this for years. Database operations and transactions, while needing to support some degree of concurrency, are still rather heavy operations. Postgres default config specifies a max of 100 connections, for example, and SQLite doesn't even support concurrent writers. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee is still a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.

That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.

Rather than rewriting Peewee to be async-first or building a separate coroutine-based implementation, playhouse.pwasyncio instead provides a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop. This is accomplished using greenlet, the same approach used by SQLAlchemy. When database I/O occurs inside the Peewee-managed greenlet, control is yielded to the event loop until the async driver completes its operation. This makes it possible to use existing synchronous code and rely on Peewee's async bridging to handle yielding to the loop whenever queries are executed.

To get started use one of the Async database implementations. Typically this is the only thing you will need in order to use Peewee with asyncio:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('my_app.db')

class User(db.Model):
    name = TextField()

Queries must be executed through an async execution method. This ensures that when blocking would occur, control is properly yielded to the event loop. The database context (async with db) acquires a connection from the pool and releases it on exit:

async def main():
    async with db:
        await db.acreate_tables([User])

        # Create a new user in a transaction.
        async with db.atomic() as txn:
            user = await db.run(User.create, name='Huey')

        # Fetch a single row from the database.
        huey = await db.get(User.select().where(User.name == 'Huey'))
        assert huey.name == user.name

        # Execute a query and iterate results.
        for user in await db.list(User.select().order_by(User.name)):
            print(user.name)

        # Async lazy result fetching (uses server-side cursors where
        # available).
        query = User.select().order_by(User.name)
        async for user in db.iterate(query):
            print(user.name)

    await db.close_pool()

asyncio.run(main())

The most general way to make database operations non-blocking with Peewee is to write your code synchronously and pass a callable to the run() method, e.g.:

# Single operation:
user = await db.run(User.create, name='Alice')

# Multi-step function:
def register(username, bio):
    with db.atomic():
        user = User.create(name=username)
        Profile.create(user=user, bio=bio)
        return user

user = await db.run(register, 'alice', 'Python developer')

db.run() is recommended when:

Additionally, you can use the various async helpers:

# Execute any query and get its natural return type.
query = (User.delete()
         .where(User.username.contains('billing'))
         .returning(User.username))
for spammer in await db.aexecute(query):
    print('Deleted', spammer.username)

# Use a transaction:
async with db.atomic() as tx:
    await db.run(User.create, name='Bob')

# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))

# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))

# SELECT and stream results from the database asynchronously.
users = [user async for user in db.iterate(User.select())]

# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT(User.id)))

# Or user shortcut.
count = await db.count(User.select())

# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])

# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall())   # [(1,)]

For more details, see the documentation.

FastAPI example

Peewee's asyncio integration can be used with async-first frameworks like FastAPI.

The following example demonstrates how to:

from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from peewee import *
from playhouse.pwasyncio import *


app = FastAPI()

db = AsyncPostgresqlDatabase('peewee_test')

class User(db.Model):
    username = TextField()

async def get_db():
    async with db:
        yield db

@asynccontextmanager
async def lifespan(app):
    async with db:
        await db.acreate_tables([User])
    yield
    await db.close_pool()

app = FastAPI(lifespan=lifespan)

@app.get('/users')
async def list_users(db=Depends(get_db)):
    return await db.list(User.select().dicts())

Peewee also comes with pydantic support - see the example for more details.

How does it work?

Synchronous ORM code runs inside a greenlet, and async I/O is bridged internally by Peewee using two helpers, greenlet_spawn() and await_(). The greenlet_spawn() helper runs synchronous code inside a greenlet, and can be suspended and resumed in order to yield to the asyncio event loop. Yielding is done by the await_() helper, which suspends the greenlet and passes control to the asyncio coroutine. View the code to see the full implementation.

Here's a small example showing the execution path and how context switches between sync and async worlds:

import asyncio
from playhouse.pwasyncio import *

# Synchronous function gets run by the greenlet helper.
def synchronous():
    return await_(a_add(1, 2))

# Normal async function.
async def a_add(x, y):
    await asyncio.sleep(1)
    return x + y

async def main():
    result = await greenlet_spawn(synchronous)
    print(result)

asyncio.run(main())  # After 1 second, prints 3

When this runs:

  1. main() is declared async def and runs in the "async" world. It awaits greenlet_spawn, passing in our sync callable.
  2. Inside greenlet_spawn we create a new greenlet to run our synchronous callable, with it's parent greenlet set to our async greenlet runner. This is the link between sync and async. Inside the greenlet everything is synchronous, but it can yield coroutines to the parent (async), which then runs them on the loop.
  3. Inside the greenlet we begin executing our synchronous code - it does NOT know anything about asyncio. Eventually it hits await_(a_add(1, 2)). When we call a_add(1, 2) Python gives us a coroutine, which gets passed to await_().
  4. Inside the await_() helper we switch contexts back to the parent "async" world, passing our a_add() coroutine.
  5. Back in the async world, we await the coroutine. The event loop runs a_add() and gives us back the result.
  6. The async greenlet_spawn() helper then passes that result back into the synchronous greenlet as the return value of our await_() call. Synchronous execution resumes and returns the result.
  7. At this point the greenlet running our synchronous code has finished. greenlet_spawn() now finishes and returns the result (3) to main(), which gets printed.

The skeptical can verify that our synchronous callable is running asynchronously:

async def run_several():
    tasks = [greenlet_spawn(synchronous) for i in range(10)]
    print(await asyncio.gather(*tasks))

import time
start = time.perf_counter()
asyncio.run(run_several())
print(time.perf_counter() - start)  # 1.00...

In your code you should never need to use greenlet_spawn() or await_() directly. Instead use the run() method, which ensures synchronous code does not block during query executions:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    def work():
        print(db.execute_sql('select 1').fetchall())
    await db.run(work)

asyncio.run(demo())  # prints [(1,)]

As described earlier, Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided in many cases:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    curs = await db.aexecute_sql('select 1')
    print(curs.fetchall())

asyncio.run(demo())  # prints [(1,)]

Note that obtaining the results from the cursor does not happen asynchronously in this example (e.g. we do not call print(await curs.fetchall())). Internally Peewee does await fetching the results from the cursor, but the rows are all loaded before the cursor is returned to the caller. This ensures consistency with existing behavior. For server-side cursors with async result streaming, see db.iterate().

Future

Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though, as that's the spirit of Peewee.

Links:

Comments (0)


Commenting has been closed.