Experimenting with an analytics web-service using python and cassandra

The other day I was poking around my google analytics account and thought it would be a fun project to see if I could collect "analytics"-type data myself. I recalled that the Apache Cassandra project was supposed to use a data model similar to Google's BigTable so I decided to use it for this project. The BigTable data model turned out to be a good fit for this project once I got over some of the intricacies of dealing with time-series data in Cassandra. In this post I'll talk about how I went about modelling, collecting, and finally analyzing basic page-view data I collected from this very blog.

Modelling time-series with Cassandra

Cassandra (like Bigtable) is defined as "a sparse, distributed, persistent multi-dimensional sorted map." As a python developer, the way I've gone about picturing this is a large dictionary containing strings as keys and OrderedDict as values (which is incidentally how pycassa models data):

# column family "Users"
users = {
    'coleifer@example.com': OrderedDict(
        ('name', 'Charles Leifer'),
        ('occupation', 'Programmer'),
    ),
    'someone@example.com': OrderedDict(
        ('name', 'Someone Else'),
    ),
}

Just to point out a couple things:

So, how to go about modelling page-views?

Attempt #1: screw it let's sort the keys

My initial attempt got shot down first by a number of blog posts, then by the users of #cassandra on freenode. I intended to use timestamps as the keys, then store the page-view data as the columns. To do this, I needed to edit the cassandra conf to use the ByteOrderedPartitioner and instruct cassandra that my keys were going to be timestamps so order them appropriately. There are valid usecases for the ordered partitioner, but since Cassandra gives you column-level ordering for free it seemed like a bad idea to continue down this path. Additionally, a single row can contain (theoretically) something like 2B column name/value pairs, making the "columns" (the inner dictionary) an attractive place to store page-views.

Attempt #2: screw it let's use supercolumns

My second attempt was not a whole lot better. I simply "pushed" everything down a level and used an "account id" as the row key. This gave me something like:

PageViews = {
    '<account id>': OrderedDict(
        (timestamp1, <PAGE VIEW DATA>),
        (timestamp2, <PAGE VIEW DATA>),
    )
}

Cassandra has this interesting feature called "super-columns" (wtf) which allow you to nest columns within columns. I decided to make PageView a "super-column" so my data model looked like:

PageViews = {
    '<account id>': OrderedDict(
        # the columns themselves contain dictionaries as their values
        (timestamp1, {'url': '...', 'title': '...', 'ip': '...'}),
        (timestamp2, {'url': '...', 'title': '...', 'ip': '...'}),
    )
}

Then I read that supercolumns were actually a pretty bad idea and would be replaced with composite columns. The other problem is that a single account's row can grow without bounds in this schema. If a site got on average 10 page-views/second, that would be 3600 * 24 * 10 == 864,000 new columns a day...that's a pretty big row to be slicing and dicing and moving in/out of memory.

Attempt #3: let's shard on date and serialize everything

My next attempt is what I ended up sticking with. Because I have multiple sites I wanted to keep the data for each in uniquely named rows. I ended up using a key composed of an account ID and a date shard (one row per account per day, but could be modified to fit your needs). Then, to fix the problem of supercolumns, I simply serialized all the page-view data using msgpack. It looks like this:

PageViews = {
    '20120801.<account_id>': OrderedDict(
        (timestamp1, <serialized pageview data>),
        (timestamp2, <serialized pageview data>),
    ),
    '20120731.<account_id>': OrderedDict(
        ...
    ),

}

The data cannot be queried by value (e.g. query serialized data) in this way, but it is very easy to retrieve multiple days' worth of data (the rows), and in each row to retrieve slices of various intervals for a given account. Later I'll show how I threw together a tiny map/reduce helper to process the actual serialized page-view data.

An equally viable alternative would be to store the account id in a composite column alongside the timestamp.

Collecting the data (this was fun)

I remember hearing of how people had used 1-pixel gifs to retrieve analytics data, and this works surprisingly well. I'm pretty sure you can't generally push results from the client using ajax because of cross-domain restrictions, so a workaround is to have some javascript dynamically create an image. The image's URL carries a payload of data about the current page, such as the URL, title, referrer, etc. The browser goes out to load the image at the given URL, which is conveniently located on your server, and your server decodes the page-view information, stores it, and returns a single-pixel gif. On a basic level, this is what google analytics does.

Analytics Flow

To serve the javascript and gif, as well as coordinate the storage, I chose to use flask. The app would be responsible for 2 views, the request for the javascript file and the request for the gif.

from flask import Flask, request, Response

app = Flask(__name__)


@app.route('/a.gif')
def analyze():
    # pull data out of args & headers
    data = <parse data from request.args and request.headers>

    # store data in cassandra
    <store data>(**data)

    response = Response(<single pixel gif>, mimetype='image/gif')

    # note the no-cache headers on the response
    response.headers['Cache-Control'] = 'private, no-cache'
    return response

@app.route('/a.js')
def analytics_code():
    return Response(<javascript>, mimetype='text/javascript')

The gif and javascript were stored in local variables. The javascript source itself looks like this:

(function() {
    var doc = document,
        img = new Image,
        esc = encodeURIComponent;
    img.src = 'http://analytics.xxx/a.gif' + \
        '?url=' + esc(doc.location.href) + \
        '&ref=' + esc(doc.referrer) + \
        '&title=' + esc(doc.title);
})();

The img that gets dynamically created has a src attribute that looks something like this (would be url encoded):

src=http://analytics.xxx/a.gif?url=http://myblog/some-post/&title=Some post&ref=..

The actual request from the client's browser for the image contains other interesting info like their IP address and any request headers the browser sends like language and user-agent.

Here is a sampling of the code I used to extract the data and store it:

@app.route('/a.gif')
def analyze():
    # pull parameters off the request and the headers
    args, headers = request.args, request.headers
    data = dict(
        title=args.get('title', ''),
        ip=headers['x-real-ip'], # i'm using nginx in front
        referrer=args.get('ref', ''),
        user_agent=headers['user-agent'],
        language=headers.get('accept-language', '').split(',')[0],
    )

    acct_id = str(args.get('id', ''))
    url = args.get('url') or ''

    # spawn a green thread to store the data
    gevent.spawn(store_data, acct_id, url, **data)

    response = Response(BEACON, mimetype='image/gif')
    response.headers['Cache-Control'] = 'private, no-cache'
    return response

def store_data(id, url, **data):
    # add the url to the data
    data['url'] = url

    cur_time = datetime.datetime.utcnow()
    time_uuid = convert_time_to_uuid(cur_time)

    row_key = '%s.%s' % (cur_time.strftime('%Y%m%d'), id)

    PageViews.insert(row_key, {
        time_uuid: msgpack.packb(data)
    })

Analyzing the data to get simple counts

Suppose I wanted to get counts of page-views for a single day, grouped by URL. Its very easy to iterate over a slice of columns and store counts:

counts = {}
day = datetime.datetime(2012, 8, 1)
key = '%s.%s' % (day.strftime('%Y%m%d'), '<account id>')

query = PageViews.xget(key) # query is a generator

for (ts, data) in query:
    data = msgpack.unpackb(data)
    counts.setdefault(data['url'], 0)
    counts[data['url']] += 1

If I just wanted an hour's worth of data, I could specify a "column_start" and "column_finish" indicating the times I wanted to query:

start = datetime.datetime(2012, 8, 1, 7, 0) # 7am
finish = start + datetime.timedelta(seconds=2*60*60) # 9am

query = PageViews.xget(key, column_start=start, column_finish=finish)

This type of analysis lends itself well to a map/reduce workflow. I wrote up 2 little python helpers to make this easier. The first generates lists of keys for a given date range, the second wraps the map/reduce process to avoid having to write duplicate "deserialization" code.

def prep_query(id, start_date, end_date):
    keys = []
    start_uuid = convert_time_to_uuid(start_date)
    end_uuid = convert_time_to_uuid(end_date)

    while start_date <= end_date:
        keys.append('%s.%s' % (start_date.strftime('%Y%m%d'), id))
        start_date += datetime.timedelta(days=1)

    return keys, start_uuid, end_uuid

def map_reduce(id, start_date, end_date, map_fn, reduce_fn, initializer=None):
    """
    Very primitive map/reduce -- map functions will receive a generator that
    iterates over pageview columns.  The iterable is of the form:

        (account_id, datetime of pageview, pageview data)
    """
    keys, column_start, column_finish = prep_query(id, start_date, end_date)

    # wrap the multiget operation for passing in as the map_fn
    def wrapped(k):
        try:
            gen = PageViews.xget(k, column_start=column_start, column_finish=column_finish)
        except NotFoundException:
            pass
        else:
            for ts, data in gen:
                converted = convert_uuid_to_time(ts)
                unpacked = msgpack.unpackb(data)
                yield (id, datetime.datetime.fromtimestamp(converted), unpacked)

    mapped_results = []
    for result in map(map_fn, map(wrapped, keys)):
        mapped_results.append(result)

    return reduce(reduce_fn, mapped_results, initializer)

Again, here is the "page-view counter" code rewritten to use our helper functions. While this example is lengthier and less efficient than the example above, it shows the potential for parallelizing this kind of work:

def map_fn(columns):
    results = []
    for id, ts, data in columns:
        results.append((data['url'], 1))
    return results

def reduce_fn(accum, item):
    for title, ct in item:
        accum.setdefault(title, 0)
        accum[title] += ct
    return accum

map_reduce(<acct id>, <start_date>, <end_date>, map_fn, reduce_fn, {})

Your map functions can do all sorts of interesting things, like:

Ideas for improvements and interesting links

One thing I noticed while looking at the google analytics "dance" is that it does some things with cookies to track new versus returning visitors -- this would be a pretty neat feature. It might also be cool to denormalize page-view data and store page-views by IP (especially if you have a large number of sites) and try to isolate individual users that way (I'm sure Google is correlating all of the information it collects to generate a "unique" picture of an individual's browsing history, based on simple things like user-agent and IP address).

It would be great for the javascript library to expose a lightweight API for folks on the client-side to send custom events. Google does this with their analytics platform and it is regularly used to measure ad campaigns.

Finally, this code is very naive! It's my first experiment with Cassandra and very likely there are interesting features I've overlooked in my simplistic data model. If anyone out there has suggestions on improving it or has had success with alternate models I'd love to hear about it.

Here are a couple links and references I found helpful while working on this:

If you're interested in more projects like this, check out the saturday-morning hack posts.

Hope you enjoyed reading! Here's a lil' screenshot from a web frontend I built atop this:

Analytics Chart

Comments (2)

Simon Rumble | aug 21 2012, at 02:05am

You should check out SnowPlow which is a loosely-coupled web analytics tool set designed to use Hadoop for the data processing. Though each component could be swapped out on demand.

http://snowplowanalytics.com/

Terry Nathan | aug 16 2012, at 05:28pm

Interesting post! I'd like to start experimenting with a lot of the things you've mentioned here e.g. map-reduce, home-grown analytics, and I'll be sure to refer back to this when I get to it. By the way, you can use "from collections import defaultdict" and then d = defaultdict(int) to do the counting of urls you were doing earlier.


Commenting has been closed.