Apache Hive is a data warehouse system built on top of Hadoop. Using SQL-like language you can query data stored in the Hadoop filesystem (HDFS). Those queries are then translated into Map Reduce jobs and executed on your cluster.

As an example we'll analyze tweets from the Twitter Streaming logs and calculate the top 5 hashtags per day which are associated with positive sentiment signals (smileys).

You can imagine how this can be expand this to simple sentiment analysis on your (potential) customer feedback.

Gather the data from Twitter streaming API
The JSON log lines from the Twitter Streaming API look like these:

      "created_at": "Sat Sep 10 22:23:38 +0000 2011",
      "id_str": "112652479837110273",
      "text": "@twitter meets @seepicturely at #tcdisrupt cc.@boscomonkey @episod http://t.co/6J2EgYM",
      "user": {
        "name": "Eoin McMillan ",
        "screen_name": "imeoin",

For now we only care about the "created_at" and "text" attributes. See detailed information on all available attributes at https://dev.twitter.com/docs/platform-objects/tweets

Import raw data into Hadoop & Hive
Now from the Hive console import the logs into a Hive table.

create table raw_tweets (json string);
load data local inpath 'sample.json' into table raw_tweets;

With Hadoop it is a best practice to always preserve the raw source data. It is pretty common that you detect obscure parse errors or missing information days later, keeping the source information allows you to correct this without much hassle.

Parse JSON tweets and extract relevant information
From this raw data, we parse and extract the actual information that we care about. Using the get_json_object function we can access the "text" and "created_at" attributes using an XPath like query on the JSON object. The timestamp needs to be converted into Unix epoch format for later formatting.

create table tweets as
    select get_json_object(json, "$.text") as text,
           unix_timestamp(get_json_object(json, "$.created_at"),
                "EEE MMM d HH:mm:ss Z yyyy") as ts_created
    from raw_tweets;

Text parsing, extract hashtags and sentiment identification
Now that we got the source data in a format we can deal with (text, timestamp), it is time to identify sentiment information. For this case, we'll compose a regular expression that matches some positive smileys: 🙂 🙂 😉 and ;-). Feel free to expand this to your taste. Also we split the text on whitespaces and identify terms which look like a #hashtag. These matching hashtags we emit together with the date in YYYY-MM-DD format allowing us to do daily aggregations afterwards.

create table positive_hashtags_per_day as
    select from_unixtime(ts_created, 'yyyy-MM-dd') as dt,
        lower(hashtag) as hashtag from tweets
            lateral view explode(split(text, ' ')) b as hashtag
        where ts_created is not null
            and hashtag rlike "^#[a-zA-Z0-9]+$"
            and text rlike "^.*[\;:]-?\$$.*$";

Aggregate occurrences per day
Aggregation is straight forward, just a matter of counting the occurrences per day. Now if you want to do multiple aggregations (per week, per month, etc), you might want to move the date string creation to this step.

create table count_positive_hashtags_per_day as
    select dt, hashtag, count(*) as cnt from positive_hashtags_per_day
        group by dt, hashtag;

Limit top 5 results per day using external reducer
Finally we only want the top 5 results per day. Now this is a bit tricky in Hive, as this requires a user defined function or streaming reducer call. We do the latter using a little piece of Python code that only returns the first 5 results per keyword. Because the input to the reducer call is already secondary sorted on the count in descending order, this will return the top 5 results; just how we want it.

add file topN.py;
create table top5_positive_hashtags_per_day as
    reduce dt, hashtag, cnt
    using 'topN.py 5' as dt, hashtag, cnt
    (select dt, hashtag, cnt from count_positive_hashtags_per_day
        distribute by dt sort by dt, cnt desc) cnts;

The Python reduce code looks like this. Due to the way the Hadoop Streaming API works, you need to detect key boundaries yourself.

#!/usr/bin/env python
# Reducer that returns the top N results per keyword
import sys

maxN = int(sys.argv[1])
last_key = None
count = 0
for line in sys.stdin:
    (key, value) = line.strip().split("\t", 1)
    if key != last_key:
        count = 0
        last_key = key;
    if count < maxN:
        print "%s\t%s" % (key, value)
    count += 1

The results of this exercise on my tiny sample set (can't redistribute the source according to the Twitter TOS):

hive> select * from top5_positive_hashtags_per_day;
2011-02-25 #ff 8
2011-02-25 #happy 3
2011-02-25 #followfriday 2
2011-02-25 #teamzeeti 2
2011-02-25 #baensv 1

2011-02-26 #db40birthday 5
2011-02-26 #bieberfact 2
2011-02-26 #teamfollowback 2
2011-02-26 #feelingsoo 1
2011-02-26 #aktf 1

2011-02-27 #1 2
2011-02-27 #12 1
2011-02-27 #27f 1
2011-02-27 #dvdmeb 1
2011-02-27 #fail 1
Time taken: 4.231 seconds

Follow Fridays make people happy... who would have thought 🙂

Download the full code and Follow @mids106