Ditching the Task Queue for Gevent
Task queues are frequently deployed alongside websites to do background processing outside the normal request/response cycle. In the past I've used them for things like sending emails, generating thumbnails, warming caches, or periodically fetching remote resources. By pushing that work out of the request/response cycle, you can increase the throughput (and responsiveness) of your web application.
Depending on your workload, though, it may be possible to move your task processing into the same process as your web server. In this post I'll describe how I did just that using gevent, though the technique would probably work well with a number of different WSGI servers.
Setup
The setup I use is stupid simple and works well for the handful of sites I run. I have Nginx sitting out front listening to the internet, and then I reverse proxy back to a gevent WSGIServer. My WSGI application wrapper looks like this:
# Apply patches to stuff that blocks to enable gevent magic.
from gevent import monkey; monkey.patch_all()
import sys
sys.stdout = sys.stderr # Redirect output to stderr.
from my_app import app # Flask app, whatever...
from gevent.wsgi import WSGIServer
WSGIServer(('127.0.0.1', 8888), app).serve_forever()
Here's a bite-sized Nginx config that should work with the above WSGI server:
upstream myapp {
server 127.0.0.1:8888;
}
server {
listen 80;
server_name my_app.com;
location /static/ {
root /path/to/static/assets/;
}
location / {
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://myapp;
}
}
Concurrency
Since requests to my WSGI application are being served by a single process, how is it able to process multiple requests concurrently? The answer lies in the clever monkey-patching that gevent applies where Python would ordinarily block on I/O. If an operation would block, the currently-running code yields control (via behind-the-scenes coroutines) to a central coordinator which can then select a different coroutine to wake up. gevent uses a fast event loop so it can be smart about when it decides to wake someone up.
Even if we were using a pool of OS threads to handle requests, because of the global interpreter lock, we're limited to running Python bytecode serially. The differences between a WSGI server like CherryPy (thread-pool) and gevent (greenlets/coroutines) are two-fold:
- The Python interpreter can preemptively switch execution from one running thread to another, which it does after a certain number of instructions or when waiting on I/O. This means that CPU-bound tasks won't finish any faster than if they were run single-threaded -- it just means that one bad actor won't clog up the whole works, since Python will transfer control to another thread after a while.
- gevent threads are much lighter-weight than OS threads, so it's possible to run a lot more of them efficiently. Furthermore, and this is speculation, I would guess that gevent is smarter about context-switches than the Python interpreter is, by virtue of registering event handlers for I/O operations. Fewer/cheaper context switches would, presumably, lead to more efficient execution when I/O is the limiting factor.
Given these differences, it seems to me that greenlets are generally a better proposition for handling web requests under the following conditions:
- You can get past the aesthetics of having to monkey-patch
socket
(seriously, this trips up a lot of people...just look at the garbage fire that isasyncio
and you'll see why this is number one). - You don't do a lot of CPU-intensive operations in your request handlers.
This is important because, remember, gevent cannot pre-empt a long-running
block of code unless it performs I/O (or explicitly yields control via a
call to
gevent.sleep()
).
An additional benefit gevent
has over something like asyncio
is that you
don't need to sprinkle pixie-shit all over your code and use purpose-built
libraries for everything. Apply the patch and everything already works!
Asynchronous Other Stuff
Earlier I described how gevent consists of a central coordinator which manages a pool of coroutines. With a WSGI application, these coroutines will be your request handlers. But there's no reason they couldn't be other things as well, right?
Thanks to the monkey-patching, whenever a running greenlet would do blocking I/O, control is yielded to the central Hub which can then wake-up any other paused greenlet that has data ready. Besides request handlers, gevent can be a good fit for asynchronous tasks that are I/O-bound. For example:
- Sending emails
- Interacting with remote APIs
- Working with S3, Redis, networked databases, etc.
- Long-running socket operations (e.g. gevent-socketio)
- Spawning subprocesses
Things that might not work well:
- CPU-intensive data processing (report generation, thumbnailing, etc)
- File I/O (with some caveats)
As you'll see later on in the post, gevent provides some facilities to help work around these limitations.
Implementation
A common feature on many webpages is to have a contact page which sends an email to the site administrator. Sending email necessarily does some socket I/O, so it's a good candidate for cooperative multi-tasking. Let's look at how we might implement such a view to leverage gevent for sending the email.
First let's look at a typical blocking setup:
from flask import flash, redirect, render_template, request
from my_app import app
@app.route('/contact/', methods=['GET', 'POST'])
def contact():
if request.method == 'POST':
name = request.form['name']
email = request.form['email']
message = request.form['message']
# Send the email -- wait until it's finished sending before returning
# the HTTP response.
send_contact(name, email, message)
flash('Your message has been sent', 'success')
return redirect(url_for('contact'))
return render_template('contact.html')
def send_contact(name, email, message):
# send the actual email...
pass
To make the send_contact
call asynchronous, we'll make the following small
modification:
import gevent
@app.route('/contact/', methods=['GET', 'POST'])
def contact():
if request.method == 'POST':
name = request.form['name']
email = request.form['email']
message = request.form['message']
# Use gevent.spawn() which accepts the function/callable and then the
# list of arguments/keyword-arguments to call the function with. This
# will return quickly and not block sending the response while the
# email is being sent.
gevent.spawn(send_contact, name, email, message)
flash('Your message has been sent', 'success')
return redirect(url_for('contact'))
return render_template('contact.html')
This small change is all that is required to prevent the call to send_contact
from blocking the sending of the response.
It's important to note, however, that because we applied a monkey-patch to all
socket operations, even if we had left the original version intact our server
would still be able to handle other requests/tasks while the email was being
sent. The only difference between the first version and the second (which uses
gevent.spawn
) is that the first will wait until the email is sent before
returning a response, while the second spawns a completely new coroutine to
handle sending the email and returns the response without waiting for the
result. In both cases, though, other requests/tasks can be run concurrently.
Scheduling
In addition to spawning greenlets to run immediately, gevent
provides an
additional helper spawn_later()
which will schedule the greenlet to run after
a certain number of seconds. To use this form:
# will call run_in_an_hour('some arg', 'another arg', foo=True) in ~1 hour
gevent.spawn_later(3600, run_in_an_hour, 'some arg', 'another arg', foo=True)
Dealing with blocking operations
gevent also provides some nice facilities to make blocking operations more cooperative. Let's consider a function that generates a report and is computationally intensive. There are a couple ways we can modify this function so that it doesn't block other greenlets from running. Because gevent uses cooperative multi-tasking (rather than pre-emptive) we need to explicitly tell the Hub when we are yielding control. The monkey-patches automatically do this for us when I/O is performed, but when we're doing CPU-intensive things it's necessary to be explicit.
Here we'll use gevent.sleep()
to yield control after every 10 rows, to allow other greenlets to run:
def generate_report(ndays=7):
cutoff = datetime.date.today() - datetime.timedelta(days=ndays)
data = (Order
.select()
.where(Order.order_date >= cutoff)
.order_by(Order.order_date))
rows = []
for i, order in enumerate(data):
if i % 10 == 0:
gevent.sleep() # Yield to the Hub.
# Generate a row of the report...do some calculations, etc...
rows.append(...)
return rows
If you have a low-priority task, like generating a thumbnail, you can use
gevent.idle()
to wait until
the event loop is completely idle before waking up and continuing execution.
def make_thumbnail(filename, width, height=None):
# Wait to generate the thumbnail until the event loop is completely idle.
gevent.idle()
# ... do the actual thumbnail generation ...
img = Image.open(filename)
img_w, img_h = img.size
# ... etc ...
I hope these examples show that, even though you may have tasks that aren't a natural fit for cooperative multi-tasking, gevent makes it very easy to adapt your code.
Wrapping up
I hope you found the information in this post helpful. By ditching a separate task queue consumer, my web applications now use fewer system resources and are easier to deploy and manage.
If you're interested in a library that takes this concept a bit further, I've written a small extension to the huey task queue called MiniHuey that provides some additional facilities and APIs for running tasks within your application. Besides managing asynchronous tasks, MiniHuey also runs a scheduler that is capable of running cron-like tasks (tasks that run repeatedly at a specific interval) as well as tasks scheduled to run at a specific time in the future.
Comments (1)
Commenting has been closed.
miguel | nov 07 2017, at 08:39pm
For blocking operations it is still possible to make use of threads with gevent, either by excluding threads from the monkey-patching (
monkey.patch_all(thread=False)
), or by using gevent's thread pool (gevent.threadpool
) which always uses real threads.If the operation involves very heavy CPU, I'm a big fan of gipc for gevent compatible multiprocessing.