Introduction to Redis streams with Python

photos/redis-logo.png

Redis 5.0 contains, among lots of fixes and improvements, a new data-type and set of commands for working with persistent, append-only streams.

Redis streams are a complex topic, so I won't be covering every aspect of the APIs, but hopefully after reading this post you'll have a feel for how they work and whether they might be useful in your own projects.

Streams share some superficial similarities with list operations and pub/sub, with some important differences. For instance, task queues are commonly implemented by having multiple workers issue blocking-pop operations on a list. The benefit of this approach is that messages are distributed evenly among the available workers. Downsides, however, are:

Similarly, Redis pub/sub can be used to publish a stream of messages to any number of interested consumers. Pub/sub is limited by the fact that it is "fire and forget". There is no history, nor is there any indication that a message has been read.

Streams allow the implementation of more robust message processing workflows, thanks to the following features:

The rest of the post will show some examples of working with streams using the walrus Redis library. If you prefer to just read the code, this post is also available as an ipython notebook.

Getting started with streams

I maintain a Redis utility library named walrus that builds on and extends the Redis client from the redis-py package. In this way, you can use the Database class provided by walrus as a drop-in replacement for the usual Redis client without having to learn a new library up-front. I've added support for the new streams APIs, since they aren't available in redis-py at the time of writing (note: streams support was merged November 1st, 2018). To follow along, you can install walrus using pip:

$ pip install walrus

Or to install the very latest code from the master branch:

$ pip install -e git+git@github.com:coleifer/walrus.git#egg=walrus

walrus supports low-level streams APIs, as well as offering high-level container types which are a bit easier to work with in Python.

I like to think of streams as having two modes of operation:

In the first section, I'll cover stream operations in "standalone" mode. The rest of the post will detail working with consumer groups for stateful stream processing.

Basic operations

Streams are append-only data-structures that store a unique identifier (typically a timestamp) along with arbitrary key/value data. In standalone mode, streams behave much like every other data-structure in Redis. By this, I mean that they act as a dumb container: you append items, you read them, you delete them – everything happens explicitly. Streams support the following operations:

To get started with streams, we’ll create a Database instance and use it to instantiate a Stream:

from walrus import Database  # A subclass of the redis-py Redis client.

db = Database()
stream = db.Stream('stream-a')

When adding data to a stream, Redis can automatically provide you with a unique timestamp-based identifier, which is almost always what you want. When a new message is added, the message id is returned:

msgid = stream.add({'message': 'hello, streams'})
print(msgid)

# Prints something like:
# b'1539008591844-0'

Message ids generated by Redis consist of a timestamp, in milliseconds, along with a sequence number (for ordering messages that arrived at the same millisecond).

Let's add a couple more items so we have more data to work with:

msgid2 = stream.add({'message': 'message 2'})
msgid3 = stream.add({'message': 'message 3'})

Ranges of records can be read using slices. The message ids provided as the range endpoints are inclusive when using the range API:

# Get messages 2 and newer:
messages = stream[msgid2:]

# messages contains:
[(b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

# We can use the "step" parameter to limit the number of records returned.
messages = stream[msgid::2]

# messages contains the first two messages:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'})]

# Get all messages in stream:
messages = list(stream)
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

The size of streams can be managed by deleting messages by id, or by "trimming" the stream, which removes the oldest messages. The desired size is specified when issuing a "trim" operation, though, due to the internal implementation of the stream data-structures, the size is considered approximate by default.

# Adding and deleting a message:
msgid4 = stream.xadd({'message': 'delete me'})
del stream[msgid4]

# How many items are in the stream?
print(len(stream))  # Prints 3.

To see how trimming works, let's create another stream and fill it with 1000 items, then request it to be trimmed to 10 items:

# Add 1000 items to "stream-2".
stream2 = db.Stream('stream-2')
for i in range(1000):
    stream2.add({'data': 'message-%s' % i})

# Trim stream-2 to (approximately) 10 most-recent messages.
nremoved = stream2.trim(10)
print(nremoved)
# 909
print(len(stream2))
# 91

# To trim to an exact number, specify `approximate=False`:
stream2.trim(10, approximate=False)  # Returns 81.
print(len(stream2))
# 10

Processing data in real-time

The previous examples show how to add, read and delete messages from streams. When processing a continuous stream of events, though, it may be desirable to block until messages are added. For this we can use the read() API, which supports blocking until messages become available.

# By default, calling `stream.read()` returns all messages in the stream:
stream.read()

# Returns:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

We can pass a message id to read(), and unlike the slicing operations, this id is considered the "last-read message" and acts as an exclusive lower-bound:

# Read any messages newer than msgid2.
stream.read(last_id=msgid2)

# Returns:
[(b'1539008918230-0', {b'message': b'message 3'})]

# This returns None since there are no messages newer than msgid3.
stream.read(last_id=msgid3)

We can make read() blocking by specifying a special id, "$", and a timeout in milliseconds. To block forever, you can use block=0.

# This will block for 2 seconds, after which an empty list is returned
# (provided no messages are added while waiting).
stream.read(timeout=2000, last_id='$')

While its possible to build consumers using these APIs, the client is still responsible for keeping track of the last-read message ID and coming up with semantics for retrying failed messages, etc. In the next section, we'll see how consumer groups can greatly simplify building a stream processing pipeline.

Consumer groups

Consumer groups make it easy to implement robust message processing pipelines. Consumer groups allow applications to read from one or more streams, while keeping track of which messages were read, who read them, when they were last read, and whether they were successfully processed (acknowledged). Unacknowledged messages can be inspected and claimed, simplifying "retry" logic.

# Consumer groups require that a stream exist before the group can be
# created, so we have to add an empty message.
stream_keys = ['stream-a', 'stream-b', 'stream-c']
for stream in stream_keys:
    db.xadd(stream, {'data': ''})

# Create a consumer-group for streams a, b, and c. We will mark all
# messages as having been processed, so only messages added after the
# creation of the consumer-group will be read.
cg = db.consumer_group('cg-abc', stream_keys)
cg.create()  # Create the consumer group.
cg.set_id('$')

To read from all the streams in a consumer group, we can use the read() method. Since we marked all messages as read and have not added anything new since creating the consumer group, the return value is an empty list:

resp = cg.read()

# Returns an empty list:
[]

For convenience, walrus exposes the individual streams within a consumer group as attributes on the ConsumerGroup instance. Let's add some messages to streams a, b, and c:

cg.stream_a.add({'message': 'new a'})
cg.stream_b.add({'message': 'new for b'})
for i in range(10):
    cg.stream_c.add({'message': 'c-%s' % i})

Now let's try reading from the consumer group again. We'll pass count=1 so that we read no more than one message from each stream in the group:

# Read up to one message from each stream in the group.
cg.read(count=1)

# Returns:
[('stream-a', [(b'1539023088125-0', {b'message': b'new a'})]),
 ('stream-b', [(b'1539023088125-0', {b'message': b'new for b'})]),
 ('stream-c', [(b'1539023088126-0', {b'message': b'c-0'})])]

We've now read all the unread messages from streams a and b, but stream c still has messages. Calling read() again will give us the next unread message from stream c:

# Read up to 1 message from each stream in the group. Since
# we already read everything in streams a and b, we will only
# get the next unread message in stream c.
cg.read(count=1)

# Returns:
[('stream-c', [(b'1539023088126-1', {b'message': b'c-1'})])]

When using consumer groups, messages that are read need to be acknowledged. Let's look at the pending (read but unacknowledged) messages from stream a using the pending() method, which returns a list of metadata about each unacknowledged message:

# We read one message from stream a, so we should see one pending message.
cg.stream_a.pending()

# Returns a list of:
# [message id, consumer name, message age, delivery count]
[[b'1539023088125-0', b'cg-abc.c1', 22238, 1]]

To acknowledge receipt of a message and remove it from the pending list, use the ack() method on the consumer group stream:

# View the pending message list for stream a.
pending_list = cg.stream_a.pending()
msg_id = pending_list[0]['message_id']

# Acknowledge the message.
cg.stream_a.ack(msg_id)

# Returns number of pending messages successfully acknowledged:
1

Consumer groups have the concept of individual consumers. These might be workers in a process pool, for example. Note that the pending() call returned the consumer name as "cg-abc.c1". Walrus uses the consumer group name + ".c1" as the name for the default consumer name. To create another consumer within a given group, we can use the consumer() method:

# Create a second consumer within the consumer group.
cg2 = cg.consumer('cg-abc.c2')

Creating a new consumer within a consumer group does not affect the state of the group itself. Calling read() using our new consumer will pick up from the last-read message, as you would expect:

# Read from our consumer group using the new consumer. Recall
# that we read all the messages from streams a and b, and the
# first two messages in stream c.
cg2.read(count=1)

# Returns:
[('stream-c', [(b'1539023088126-2', {b'message': b'c-2'})])]

If we look at the pending message status for stream c, we will see that the first and second messages were read by the consumer "cg-abc.c1" and the third message was read by our new consumer, "cg-abc.c2":

# What messages have been read, but were not acknowledged, from stream c?
cg.stream_c.pending()

# Returns list of metadata, consisting of each pending message id, 
# consumer, message age, delivery count:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 51329, 'times_delivered': 1}],
 {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 43772, 'times_delivered': 1},
 {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 5966, 'times_delivered': 1}]

Consumers can claim pending messages, which transfers ownership of the message and returns a list of (message id, data) tuples to the caller:

# Unpack the pending messages into a couple variables.
mc1, mc2, mc3 = cg.stream_c.pending()

# Claim the first message for consumer 2:
cg2.stream_c.claim(mc1['message_id'])

# Returns a list of (message id, data) tuples for the claimed messages:
[(b'1539023088126-0', {b'message': b'c-0'})]

Re-inspecting the pending messages for stream c, we can see that the consumer for the first message has changed and the message age has been reset:

# What messages are pending in stream c?
cg.stream_c.pending()

# Returns:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 2168, 'times_delivered': 1},
 {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 47141, 'times_delivered': 1},
 {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 9335, 'times_delivered': 1}]

Consumer groups can be created and destroyed without affecting the underlying data stored in the streams:

# Destroy the consumer group.
cg.destroy()

# All the messages are still in "stream-c":
len(db.Stream('stream-c'))
# Returns 10.

The individual streams within the consumer group support a number of useful APIs:

TimeSeries API

Redis automatically uses the millisecond timestamp plus a sequence number to uniquely identify messages added to a stream. This makes streams a natural fit for time-series data. To simplify working with streams as time-series in Python, you can use the special TimeSeries helper class, which acts just like the ConsumerGroup from the previous section with the exception that it can translate between Python datetime objects and message ids automatically.

To get started, we'll create a TimeSeries instance, specifying the stream keys, just like we did with ConsumerGroup:

# Create a time-series consumer group named "demo-ts" for the
# streams s1 and s2.
ts = db.time_series('demo-ts', ['s1', 's2'])

# Add dummy data and create the consumer group.
db.xadd('s1', {'': ''}, id='0-1')
db.xadd('s2', {'': ''}, id='0-1')
ts.create()
ts.set_id('$')  # Do not read the dummy items.

Let's add some messages to the time-series, one for each day between January 1st and 10th, 2018:

from datetime import datetime, timedelta

date = datetime(2018, 1, 1)
for i in range(10):
    ts.s1.add({'message': 's1-%s' % date}, id=date)
    date += timedelta(days=1)

We can read messages from the stream using the familiar slicing API. For example, to read 3 messages starting at January 2nd, 2018:

ts.s1[datetime(2018, 1, 2)::3]

# Returns messages for Jan 2nd - 4th:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>,
 <Message s1 1515045600000-0: {'message': 's1-2018-01-04 00:00:00'}>]

Note that the values returned are Message objects. Message objects provide some convenience functions, such as extracting timestamp and sequence values from stream message ids:

for message in ts.s1[datetime(2018, 1, 1)::3]:
    print(message.stream, message.timestamp, message.sequence, message.data)

# Prints:
s1 2018-01-01 00:00:00 0 {'message': 's1-2018-01-01 00:00:00'}
s1 2018-01-02 00:00:00 0 {'message': 's1-2018-01-02 00:00:00'}
s1 2018-01-03 00:00:00 0 {'message': 's1-2018-01-03 00:00:00'}

Let's add some messages to stream "s2" as well:

date = datetime(2018, 1, 1)
for i in range(5):
    ts.s2.add({'message': 's2-%s' % date}, id=date)
    date += timedelta(days=1)

One difference between TimeSeries and ConsumerGroup is what happens when reading from multiple streams. ConsumerGroup returns a dictionary keyed by stream, along with a corresponding list of messages read from each stream. TimeSeries, however, returns a flat list of Message objects:

# Read up to 2 messages from each stream (s1 and s2):
messages = ts.read(count=2)

# "messages" is a list of messages from both streams:
[<Message s1 1514786400000-0: {'message': 's1-2018-01-01 00:00:00'}>,
 <Message s2 1514786400000-0: {'message': 's2-2018-01-01 00:00:00'}>,
 <Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s2 1514872800000-0: {'message': 's2-2018-01-02 00:00:00'}>]

When inspecting pending messages within a TimeSeries the message ids are unpacked into (datetime, seq) 2-tuples:

ts.s1.pending()

# Returns:
[((datetime.datetime(2018, 1, 1, 0, 0), 0), 'events-ts.c', 1578, 1),
 ((datetime.datetime(2018, 1, 2, 0, 0), 0), 'events-ts.c', 1578, 1)]

# Acknowledge the pending messages:
for msgts_seq, _, _, _ in ts.s1.pending():
    ts.s1.ack(msgts_seq)

We can set the last-read message id using a datetime:

ts.s1.set_id(datetime(2018, 1, 1))

# Next read will be 2018-01-02, ...
ts.s1.read(count=2)

# Returns:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>]

As with ConsumerGroup, the TimeSeries helper provides stream-specific APIs for claiming unacknowledged messages, creating additional consumers, etc.

Learning more

The high-level overview covers the same material as found in this post, but will be kept up-to-date (whereas this blog may not always be!).

For API documentation for the stream container-types implemented in walrus:

You can also find the code from this post condensed nicely into an ipython notebook.

For more information on streams, I suggest reading the streams introduction on the Redis documentation site.

Comments (1)

Edward | oct 10 2018, at 02:10am

Great code. Great write up. Thanks!


Commenting has been closed.