Asyncio Finally Got Peewee

pwasyncio

Which way I fly is hell; myself am hell;

And in the lowest deep a lower deep

Still threat'ning to devour me opens wide,

To which the hell I suffer seems a heav'n.

I feel that it is high time that Peewee had an assyncio story. I've avoided this for years because asyncio is, in my opinion, a lousy design for anything beyond massively concurrent I/O multiplexing. Database operations and transactions, while needing to support some degree of concurrency (Sqlite doesn't even manage that), are still rather heavy operations - Postgres default config specifies a max of 100 connections. Nonetheless, the writing has been on the wall for years now, and much like toml, type whispering, and the withering bouquet of packaging tools waiting to be tossed, asyncio is not going anywhere. Peewee remains a synchronous ORM, but in order to work with the ever-widening sphere of async-first web frameworks and database drivers, it was time to come up with a plan.

That inscrutable thing is chiefly what I hate; and be the white whale agent, or be the white whale principal, I will wreak that hate upon him.

While I still prefer threads or gevent, the implementation in playhouse/pwasyncio.py presents a fairly manageable surface area (at time of writing under 500 lines of code). Over time I'm sure this will grow, but this is a beginning at any rate.

The guiding principle of the new asyncio layer is that it doesn't get to affect the ORM. Queries, working with models, transactions, and resolving relationships all continue to behave the way they always have. Asyncio happens only during calls to execute_sql(), the lowest-common-denominator for operations in Peewee. To accomplish this, I've borrowed the same approach SQLAlchemy is using. It uses greenlet (not gevent!) to wrap synchronous code in something we can suspend and resume when blocking occurs. Control can then be passed back-and-forth between our greenlet and the asyncio event loop without needing to rewrite everything top-to-bottom.

This design makes it so that, rather than rewriting Peewee to be async-friendly, we instead have a thin compatibility layer that bridges synchronous Peewee with the asyncio event loop and programming model.

This is a barebones example using the run() helper, the primary entry-point for asyncio code:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = TextField()

async def demo():
    async with db:
        await db.run(db.create_tables, [User])
        await db.run(User.create, name='Charlie')

        def get_user():
            return User.select().where(User.name == 'Charlie').get()

        user = await db.run(get_user)

        def bulk_insert():
            with db.atomic():
                iq = (User
                      .insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
                      .returning(User))
                users = iq.execute()
                print('Added users: %s' % list(users))
        await db.run(bulk_insert)

        users = await db.run(list, User.select().order_by(User.name))
        for user in users:
            print(user.name)

    # Close the pool - the connection was released, but it still remains inside
    # the pool, so this ensures we are ready to shutdown completely.
    await db.close_pool()

asyncio.run(demo())

As you can see, Peewee still looks pretty much synchronous, and everything is passed into the asyncio bridge. If that isn't to your taste, though, here is the same example, but using a few asyncio helper methods:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = TextField()

async def demo():
    async with db:
        await db.acreate_tables([User])
        user = await db.run(User.create, name='Charlie')
        user_db = await db.get(User.select().where(User.name == 'Charlie'))
        assert user.name == user_db.name == 'Charlie'

        async with db.atomic():
            iq = (User
                  .insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
                  .returning(User))
            users = await db.aexecute(iq)
            print('Added users: %s' % list(users))

        for user in await db.list(User.select().order_by(User.name)):
            print(user.name)

    await db.close_pool()

asyncio.run(demo())

When running Peewee ORM code, you can choose between the two execution patterns depending on how explicit you want to be. The most general-purpose method is to write your code synchronously and pass a callable to the run() method, e.g.:

def create_user():
    return User.create(username='huey')

user1 = await db.run(create_user)
user2 = await db.run(User.create, username='mickey')

Alternately, you can use the various async helpers where appropriate:

users = await db.list(User.select())
count = await db.scalar(User.select(fn.COUNT(User.id)))
user = await db.get(User.select().where(User.id == 1))
pk = await db.aexecute(User.insert(name='Huey'))

How does it work?

Synchronous ORM code runs inside a greenlet, and async I/O is bridged explicitly by Peewee using two helpers, greenlet_spawn() and await_(). The greenlet_spawn() helper runs synchronous code, but can be suspended and resumed in order to yield to the asyncio event loop. Yielding is done by the await_() helper, which suspends the greenlet and passes control to the asyncio coroutine.

Peewee wraps all this up in a general-purpose run() method, which is the entrypoint for pretty much all async operations:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    def work():
        print(db.execute_sql('select 1').fetchall())
    await db.run(work)

asyncio.run(demo())  # prints [(1,)]

The basic flow goes something like this:

  1. The above code eventually hits the db.run() method. This method calls the greenlet_spawn() function, creating a resumable coroutine wrapping our synchronous code.
  2. The greenlet begins executing the synchronous Peewee code.
  3. We call db.execute_sql('select 1')
  4. The async database implementation calls our special await_() helper, which switches control back to the event loop.
  5. The event-loop awaits the coroutine, e.g. await conn.execute(...), awaiting the results from the cursor before handing them back.
  6. The result cursor is sent back to the greenlet, and the greenlet resumes.
  7. db.execute_sql() returns and the rest of the code continues normally.
  8. We call fetchall() on the result cursor, which returns all the rows loaded during (5).

If we try to run execute_sql() outside of the greenlet helper, Peewee will raise an exception:

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    print(db.execute_sql('select 1').fetchall())

# raises RuntimeError: await_() called outside greenlet_spawn()
asyncio.run(demo())

Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    curs = await db.aexecute_sql('select 1')
    print(curs.fetchall())

asyncio.run(demo())  # prints [(1,)]

Note that obtaining the results from the cursor does not happen asynchronously (e.g. we do not call print(await curs.fetchall())). Internally Peewee does await fetching the results from the cursor, but the rows are all loaded before the cursor is returned to the caller. This ensures consistency with existing behavior, though in future versions we may add support for streaming cursor results (via Postgres server-side cursors).

Connections and Transactions

State management in Peewee is handled by a thread local (or greenlet-local if you're using gevent) - this ensures that separate threads own their own connection and transaction state. In asyncio, particularly when mixed with the greenlet helper, neither of these options is available. Instead, Peewee uses a task-local that ensures each asyncio task has its own connection and transaction stack. This nothing-shared architecture hopefully makes working with async Peewee easier, though there is an implied limit depending on the size of your connection pool and how quickly you release connections back into it.

Transactions continue to work the same whether you're inside a coroutine or not:

async def create_users_async():
    async with db.atomic():
        await db.run(User.create, name='u1')
        await db.run(User.create, name='u2')

def create_users_sync():
    with db.atomic():
        User.create(name='u3')
        User.create(name='u4')

await create_users_async()
await db.run(create_users_sync)

Caveats

There are limitations to what can be achieved with this approach. The main one I foresee causing problems is lazy foreign-key resolution. Consider this example:

tweet = await db.get(Tweet.select())
print(tweet.user.name)  # Fails.
# RuntimeError: await_() called outside greenlet_spawn()

This fails because the relationship tweet.user was not explicitly fetched, so Peewee attempts to issue a SELECT query to get the related user. This fails because we are not operating inside the greenlet-bridged environment.

One solution is to resolve foreign keys inside db.run():

print(await db.run(lambda: tweet.user.name))

Even better is to select the related object explicitly:

query = Tweet.select(Tweet, User).join(User)
tweet = await db.get(query)
print(tweet.user.name)  # OK, no extra SELECT required.

In a similar way, iterating the related objects requires a query:

for tweet in user.tweet_set:
    print(tweet.message)
# RuntimeError: await_() called outside greenlet_spawn()

Like above, there are a few ways you can accomplish this:

# Use the db.run() helper:
tweets = await db.run(list, user.tweet_set)
for tweet in tweets:
    print(tweet.message)

# Use the db.list() helper:
for tweet in await db.list(user.tweet_set):
    print(tweet.message)

# Use prefetch (not a great fit, but just to demonstrate):
user_query = User.select().where(User.id == user.id)
tweet_query = Tweet.select()
user, = await db.run(prefetch, user_query, tweet_query)
for tweet in user.tweet_set:
    print(tweet.message)

Future

Over time, the surface of the asyncio API will probably grow to encompass more use-cases where an async helper would make sense. My desire was to start small, though - it's the spirit of Peewee, after all.

Links:

Comments (0)


Post a comment