Wiki PageRank with Hadoop

Alexander Bij

In this tutorial we are going to create a PageRanking for Wikipedia with the use of Hadoop. This was a good hands-on excercise to get started with Hadoop. The page ranking is not a new thing, but a suitable usecase and way cooler than a word counter! The Wikipedia (en) has 3.7M articles at the moment and is still growing. Each article has many links to other articles. With those incomming and outgoing links we can determine which page is more important than others, which basically is what PageRanking does.

PageRanking
Larry Page came up with the algoritm to determine the page ranking and build a search engine around it in 1996 and named it Google. He is now the CEO at Google, but only earns 1 dollar a year. I will try to explain the page ranking algorithm and how we will implement it.

In this example I will use 4 pages: A, B, C and D an non-existing page. This is a page that has not been created yet, but is being links to from C. In wikipedia you recongnize those pages as red and underlined. The links between the pages are as follows:

wiki links relationship

Rank of A is highest, because it will get points from B and C.
PageRank of page A = 'share' of the PageRank of the pages linking to A.

The formula of calculating the points is as following:
PageRank formula

The formula can be simplified to this:

PR(A) = (1-d) + d( PR(B) / Cout(B) + ... + PR(C) / Cout(C) )

The d in the formula is the damping factor to simulate 'a random surfer' and is usualy set to 0.85. If you are very interested in the details please visit the wiki pageranking page or the pageranking explained page.

If you apply the formula to our example:
PageRank of A = 0.15 + 0.85 * ( PageRank(B)/outgoing links(B) + PageRank(...)/outgoing link(...) )

Calculation of A with initial ranking 1.0 per page:

If we use the initial rank value 1.0 for A, B and C we would have the following output:
I have skipped page D in the result, because it is not an existing page.
A: 1.425
B: 0.15
C: 0.15

Calculation of A with ranking from ITERATION-1:

If we use these ranks as input and calculate it again:
A: 0.34125
B: 0.15
C: 0.15

We see that the page rank of page A is reduced. The PageRank is based on previous calculations and will get more accurate after more runs. You can add new pages, new links in the future and calculate the new rankings. This is one of the tools which search engines use to create there index. We are going to do this with a set of wikipedia pages.

Hadoop Setup

In this tutorial I will not explain how to setup Hadoop, because I cannot explain it better than the very good yahoo-hadoop-tutorial and ebiquity-Hadoop-tutorial with screen shots. I will be using the current stable version hadoop 0.20.2. Note: The eclipse plugin didn't work for me, I used the latest version instead.

So I assume you have setup an Hadoop configuration with HDFS and an Eclipse environment where you can upload files into the cluster and execute jobs against your files.

The Plan

We will split the work in three different Hadoop jobs: parsing, calculating and ordering.

Parse the big wiki xml into articles in Hadoop Job 1.
In the Hadoop mapping phase, get the article's name and its outgoing links.
In the Hadoop reduce phase, get for each wikipage the links to other pages.
Store the page, initial rank and outgoing links.

Hadoop Job 2 will calculate the new pageRank.
In the mapping phase, map each outgoing link to the page with its rank and total outgoing links.
In the reduce phase calculate the new page rank for the pages.
Store the page, new rank and outgoing links.
Repeat these steps for more accurate results.

Hadoop Job 3 will map the rank and page
Store the rank and page (ordered on rank)
See the top 10 pages!

job3

Hadoop API

If you use the code in your IDE, you will notice lots of the classes are marked as depricated. In this example I use the old API prior to 0.20.x. There is the new API (org.hadoop.mapreduce.*) and the old API (org.hadoop.mapred.*). Most examples I found on internet were based on the old API. Thats why I used the old API here. The changes can be found in the new hadoop api 0.21. It should not be very difficult to change to new API.

Hadoop Job 1: Parse the XML to Page with Links

Lets take a look at the structure of a page. A page can be downloaded as a xml file by adding Special:Export to the URL. E.g. to get the XML forthe wiki page about Hilversum:
http://en.wikipedia.org/wiki/Special:Export/Hilversum

<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.5/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.5/

http://www.mediawiki.org/xml/export-0.5.xsd"

version="0.5" xml:lang="en">
<siteinfo>
	<sitename>Wikipedia</sitename>
	<base>http://en.wikipedia.org/wiki/Main_Page</base>
	<generator>MediaWiki 1.17wmf1</generator>
	<case>first-letter</case>
	<namespaces>
	<namespace key="-2" case="first-letter">Media</namespace>
	...
	</namespaces>
</siteinfo>
<page>
	<title>Hilversum</title>
	<id>13686</id>
	<revision>
		<id>449460543</id>
		<timestamp>2011-09-10T06:42:48Z</timestamp>
		<contributor>
		<username>Archengigi</username>
		<id>7283012</id>
		</contributor>
		<comment>Hilversum vlag.svg</comment>
		<text xml:space="preserve" bytes="13996">
		... the page latest revision content with [[LINKS]],
links can point to other pages, files, external sites etc...
		</text>
	</revision>
</page>
</mediawiki>

It is a fairly simple xml structure with some siteinfo metadata and the page with the latest revision. The main part we are interested in is within the title and the text tags. Download the xml and place it in your HDFS in /user/[hostname]/[user]/wiki/in dir. When you run the job you will see the location where the file should be placed, so you can put the files in the correct directory later, after the first run.

11/09/19 12:02:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/19 12:02:08 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://192.168.88.128:54310/user/alexanderlaptop/alexander/wiki/in
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:190)

Lets create the classes in our project for Job 1. The first class we need is the main class that we can run against the hadoop cluster. I called it the WikiPageRanking. It will contain all the jobs later, but for now it only contains the first job.
Note: you can view and fetch the source from github here abij/hadoop-wiki-pageranking

public class WikiPageRanking {

    public static void main(String[] args) throws Exception {
        WikiPageRanking pageRanking = new WikiPageRanking();

        //In and Out dirs in HDFS
        pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");
    }

    public void runXmlParsing(String inputPath, String outputPath) throws IOException {
        JobConf conf = new JobConf(WikiPageRanking.class);

        FileInputFormat.setInputPaths(conf, new Path(inputPath));
        // Mahout class to Parse XML + config
        conf.setInputFormat(XmlInputFormat.class);
        conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
        conf.set(XmlInputFormat.END_TAG_KEY, "</page>");
        // Our class to parse links from content.
        conf.setMapperClass(WikiPageLinksMapper.class);

        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        // Our class to create initial output
        conf.setReducerClass(WikiLinksReducer.class);

        JobClient.runJob(conf);
    }

The main class that can run against your hadoop cluster, we will add more jobs later. You can debug your code (Mapper and Reducer) when you start the program as Debug As..

The normal InputFormat class is the TextInputFormat that will read line by line as values for the map. We want parts of the whole xml to be our input. I chose to use the Mahout XmlInputFormat to get nice input for the mapper interface. It will chop the xml into little parts within the given start and end tag <Page>. From the Hilversum.xml we will get the value between the page tags.

"]
public class WikiPageLinksMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

    private static final Pattern wikiLinksPattern = Pattern.compile("\

.+?\

"); public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // Returns String[0] = <title>[TITLE]</title> // String[1] = <text>[CONTENT]</text> // !! without the <tags>. String[] titleAndText = parseTitleAndText(value); String pageString = titleAndText[0]; Text page = new Text(pageString.replace(' ', '_')); Matcher matcher = wikiLinksPattern.matcher(titleAndText[1]); //Loop through the matched links in [CONTENT] while (matcher.find()) { String otherPage = matcher.group(); //Filter only wiki pages. //- some have [[realPage|linkName]], some single [realPage] //- some link to files or external pages. //- some link to paragraphs into other pages. otherPage = getWikiPageFromLink(otherPage); if(otherPage == null || otherPage.isEmpty()) continue; // add valid otherPages to the map. output.collect(page, new Text(otherPage)); } } //... the impl of parsePageAndText(..) //... the impl of getWikiPageFromLink(..) } }

The mapper class that will parse the chunks of xml to key page and value outLinks tuples. In this implementation all links are added to the map, even if they appear multiple times on the page.

public class WikiLinksReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        String pagerank = "1.0\t";

        boolean first = true;
        while(values.hasNext()){
            if(!first) pagerank += ",";

            pagerank += values.next().toString();
            first = false;
        }

        output.collect(key, new Text(pagerank));
    }
}

The reducer class that will store the page with the initial PageRank and the outgoing links. This output format is used as input format for the next job. Key<tab>rank<tab>CommaSeparatedList-of-linksOtherPages.

First Run result:

Hilversum 1.0 Country,Netherlands,Province,North_Holland,Mayor,Democrats_66,A...

Get a bigger file! The 500Mb latest Dutch Wiki is a sufficient start. Extracted the big xml is around 2.3 Gb.

Upload the file to your HFDS in the wiki/in folder and remove the old result folder 'ranking'. Hadoop will throw an exception if you are about to overwrite existing results. It would be a pitty if your job ran for 3 days and another job overwrites the results without notice.

Hadoop Job 2: Calculate new Page rank

This job calculates the new ranking and generates the same output format as the input, so this job can run multiple times. We will run this job after Job 1. The PageRank will become more accurate after multiple runs, so we will execute the job a few times.

Mapper

This job has its own mapper and reducer classes:

sample input:
---------------------------------------
Page_A 1.0
Page_B 1.0 Page_A
Page_C 1.0 Page_A,Page_D

public class RankCalculateMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{

    @Override
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        int pageTabIndex = value.find("\t");
        int rankTabIndex = value.find("\t", pageTabIndex+1);

        String page = Text.decode(value.getBytes(), 0, pageTabIndex);
        String pageWithRank = Text.decode(value.getBytes(), 0, rankTabIndex+1);

        // Mark page as an Existing page (ignore red wiki-links)
        output.collect(new Text(page), new Text("!"));

        // Skip pages with no links.
        if(rankTabIndex == -1) return;

        String links = Text.decode(value.getBytes(), rankTabIndex+1, value.getLength()-(rankTabIndex+1));
        String[] allOtherPages = links.split(",");
        int totalLinks = allOtherPages.length;

        for (String otherPage : allOtherPages){
            Text pageRankTotalLinks = new Text(pageWithRank + totalLinks);
            output.collect(new Text(otherPage), pageRankTotalLinks);
        }

        // Put the original links of the page for the reduce output
        output.collect(new Text(page), new Text("|"+links));
    }
}

Some links point to wikipages that do not exist (yet). In the browser you see them as red links. In the result I want to skip the non-existing pages. I chose to mark the page with an explanetion mark to indicate this page is an actual wiki page. The reducer-class will use only these pages to generate output.

For each link there is an output with the combined value page, rank and totalLink.

The last output of the mapper is the page and the origional links. We need the link so the reducer is be able to produce the correct output.

sample output:
---------------------------------------
Page_A !
Page_C |Page_A
Page_B !
Page_B |Page_A
Page_A Page_B 1.0 1
Page_C !
Page_A Page_C 1.0 2
Page_D Page_C 1.0 2

Recuder

The reducer will receive the key, values ordered by key. In a distributed environment the map is cut in slices and all nodes will get a share. The reducer will calculate the new pageRank and write it to output for the existing pages with the origional links.

sample input (sorted on key):
---------------------------------------
Page_A !
Page_A Page_C 1.0 2
Page_A Page_B 1.0 1
Page_B !
Page_B |Page_A
Page_C !
Page_C |Page_A
Page_D Page_C 1.0 2

public class RankCalculateReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

    private static final float damping = 0.85F;

    @Override
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
        boolean isExistingWikiPage = false;
        String[] split;
        float sumShareOtherPageRanks = 0;
        String links = "";
        String pageWithRank;

        // For each otherPage:
        // - check control characters
        // - calculate pageRank share <rank> / count(<links>)
        // - add the share to sumShareOtherPageRanks
        while(values.hasNext()){
            pageWithRank = values.next().toString();

            if(pageWithRank.equals("!")) {
                isExistingWikiPage = true;
                continue;
            }

            if(pageWithRank.startsWith("|")){
                links = "\t"+pageWithRank.substring(1);
                continue;
            }

            split = pageWithRank.split("\\t");

            float pageRank = Float.valueOf(split[0]);
            int countOutLinks = Integer.valueOf(split[1]);

            sumShareOtherPageRanks += (pageRank/countOutLinks);
        }

        if(!isExistingWikiPage) return;
        float newRank = damping * sumShareOtherPageRanks + (1-damping);

        out.collect(key, new Text(newRank + links));
    }
}

The output of the reducer contains the new pageRank for the existing pages with the links on those pages.

sample output:
Page_A 1.425
Page_B 0.15 Page_A
Page_C 0.15 Page_A,Page_D

We need to configure the main class so the new job is executed for a couple of times after the xml-parsing job. I have commented out the last job for now, we will create it after in the next paragraph.

public class WikiPageRanking {

    private static NumberFormat nf = new DecimalFormat("00");

    public static void main(String[] args) throws Exception {
        WikiPageRanking pageRanking = new WikiPageRanking();

        //Job 1: Parse XML
        pageRanking.runXmlParsing("wiki/in", "wiki/ranking/iter00");

        int runs = 0;
        for (; runs < 5; runs++) {
            //Job 2: Calculate new rank
            pageRanking.runRankCalculation("wiki/ranking/iter"+nf.format(runs), "wiki/ranking/iter"+nf.format(runs + 1));
        }

        //Job 3: Order by rank
        //pageRanking.runRankOrdering("wiki/ranking/iter"+nf.format(runs), "wiki/result");

    }

    public void runXmlParsing(String inputPath, String outputPath) throws IOException {
        JobConf conf = new JobConf(WikiPageRanking.class);

        conf.set(XmlInputFormat.START_TAG_KEY, "<page>");
        conf.set(XmlInputFormat.END_TAG_KEY, "</page>");

        // Input / Mapper
        FileInputFormat.setInputPaths(conf, new Path(inputPath));
        conf.setInputFormat(XmlInputFormat.class);
        conf.setMapperClass(WikiPageLinksMapper.class);

        // Output / Reducer
        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setReducerClass(WikiLinksReducer.class);

        JobClient.runJob(conf);
    }

    private void runRankCalculation(String inputPath, String outputPath) throws IOException {
        JobConf conf = new JobConf(WikiPageRanking.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(inputPath));
        FileOutputFormat.setOutputPath(conf, new Path(outputPath));

        conf.setMapperClass(RankCalculateMapper.class);
        conf.setReducerClass(RankCalculateReduce.class);

        JobClient.runJob(conf);
    }

/*
    private void runRankOrdering(String inputPath, String outputPath) throws IOException {
        JobConf conf = new JobConf(WikiPageRanking.class);

        conf.setOutputKeyClass(FloatWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(inputPath));
        FileOutputFormat.setOutputPath(conf, new Path(outputPath));

        conf.setMapperClass(RankingMapper.class);

        JobClient.runJob(conf);
    }
*/
}

I have added a loop around the execution of Job 2. It will take the input from wiki/ranking/iter00 for the first run and create output in wiki/ranking/iter01. For the next run the dir iter01 is considered the input directoy. When the loop is finished the Job 3 will get the last iterXX dir as input for the final job the ordering.

Job 3: Order last run on PageRank

This is a simple job that uses the input to get the page and rank. And map the key: rank to value: page. Hadoop will do the sorting on key for us. We don't need to implement a reducer. THe mapper and sorting is enough for our result, the ordered list.

sample input:
---------------------------------------
Page_A 1.425
Page_B 0.15 Page_A
Page_C 0.15 Page_A,Page_D

public class RankingMapper extends MapReduceBase implements Mapper<LongWritable, Text, FloatWritable, Text> {

    @Override
    public void map(LongWritable key, Text value, OutputCollector<FloatWritable, Text> output, Reporter arg3) throws IOException {
        String[] pageAndRank = getPageAndRank(key, value);

        float parseFloat = Float.parseFloat(pageAndRank[1]);

        Text page = new Text(pageAndRank[0]);
        FloatWritable rank = new FloatWritable(parseFloat);

        output.collect(rank, page);
    }

    private String[] getPageAndRank(LongWritable key, Text value) throws CharacterCodingException {
        String[] pageAndRank = new String[2];
        int tabPageIndex = value.find("\t");
        int tabRankIndex = value.find("\t", tabPageIndex + 1);

        // no tab after rank (when there are no links)
        int end;
        if (tabRankIndex == -1) {
            end = value.getLength() - (tabPageIndex + 1);
        } else {
            end = tabRankIndex - (tabPageIndex + 1);
        }

        pageAndRank[0] = Text.decode(value.getBytes(), 0, tabPageIndex);
        pageAndRank[1] = Text.decode(value.getBytes(), tabPageIndex + 1, end);

        return pageAndRank;
    }

}

The sorting on the key is ascending. So at the bottom is the highest rank page. Preferably the job should order descending. For now the result is ordered and that is good enough. Now we can uncomment Job 3 in the main class and execute all jobs together against the big dataset.

sample output:
---------------------------------------
1.425 Page_A
0.15 Page_B
0.15 Page_C

Running the big dataset (1 node)

On my laptop I used a virtual machine for the hadoop setup. The parsing of the XML, calculating 5 times and ordering took in total:

Time: 15 minutes
Input file: ~2,3 Gb
Each rank file: 238 Mb
Result file: 22 Mb

I will not spoil you with the actual results, you should see it for yourself after some heavy data crunching!

It would be nice to execute it on a cluster with multiple nodes and experience the speed, loadbalancing and failover. That's something for the next blog. When I have used a bigger cluster I will update the post.

You can view/download the source files from github.

Comments (15)

  1. Ajay - Reply

    October 13, 2011 at 4:34 am

    Hi,
    I didn't understand whats the content of the dataset you have provided. Can you explain about it.

    • abij - Reply

      October 14, 2011 at 4:52 pm

      He Ajay,

      Eventually I want the links on the wiki-pages to other wiki pages. These links are part of the text on every wiki-page.

      The content of the dataset is the text on all wiki-pages for a given language. All pages are stored in the wiki-dump-xml. For each wiki page there is a page-tag in the big xml. The content of the page is located between the text-tag inside the page-tag.

      I hope this makes it more clear.

  2. Anca - Reply

    December 17, 2011 at 2:51 pm

    Hi Abij.

    I have some question regarding the execution of the 3 jobs?
    1. Did you run the jobs simultaneously?
    2. Have you split the 2 GB input file in smaller files before the jobs execution?
    3. Did you run the program on a single node? How many map tasks did you used?

    Thanks,
    Anca

    • abij - Reply

      January 6, 2012 at 2:35 pm

      He Anca,

      1: No, the jobs are dependent on each other. The Job-1 runs only once to create the input and output format for the second job.
      Job-2 runs iterative using the previously calculated results.
      Job-3 is ordering the final results wich is the last job that is also executed once.
      Because of this order the jobs cannot run simultaniously.
      2: No, I have used 1 big file.
      3: Yes, I have used my own PC, I think I used the defaults I dont know the number of tasks excactly.

      If you are interested in the performance you can run the job yourself with source, or I can make you the job jar.

      • Kai - Reply

        February 12, 2012 at 11:17 pm

        Hi abij, can you send me please the jar file, I want to run the job on my own PC (k.elloumi@gmail.com)
        Thanks
        Kai

  3. Allan Kardec - Reply

    January 18, 2012 at 5:12 pm

    Sure it's an excellent analysis that I disagree with all of your views. Do not they say that differences of opinion make a difference? I am delighted to have found your site through Google and will not fail to add to my bookmarks.

  4. [...] 我现在能想到的,再参考了网上的实现方式,基本上都是将静态数据与动态数据合并成一个文件,同时读入(mapper)->写出(mapper)->传输(reducer)->写出(reducer)。 [...]

  5. Fadi - Reply

    March 21, 2012 at 9:01 pm

    Hi abij,
    Excellent tutorial, the information flows smoothly. But i couldn't find the data set, the link is broke?
    can you send me please the jar file, I want to run the job on my own PC (fadi20052002@gmail.com)
    Thanks
    Fad

  6. Anca - Reply

    May 28, 2012 at 1:25 pm

    Hi Abij,

    I ran the page rank job for the same data input twice and got different results?
    Is this possible? I am missing something?

    Thanks,
    Anca

  7. [...] use Map Reduce. The blog below explains a simpler version by considering only the Wikipedia links.http://blog.xebia.com/2011/09/27...Embed QuoteComment Loading... • Share • Embed • Just now  Add [...]

  8. Athresh - Reply

    December 30, 2012 at 1:10 am

    Hey,

    Have you tried using the Amazon Elastic Map Reduce to compute the pagerank? If yes, how did you go about doing it?

    • Alexander Bij - Reply

      December 30, 2012 at 9:44 pm

      He Athresh,

      No I have not. But that's interesting to see if the same results are being calculated.
      This page-ranking algorithm was my own implementation of the formula. I could compare this the amazone elestic map-reduce implementation for any differences.

      In the beginning of new year I'll pick-up bigdata again and post some new things related to this.
      greetz + happy nw

  9. hobbit - Reply

    February 26, 2013 at 1:00 pm

    hi everyone...i have a prob in execution...
    I installed hadoop on single node cluster and i m running job1(linksmapper.java reducer.java xmlinput.java and WikiPageranking.java ).i created a pagerank.jar using above 4 files and compiled the above files.first 3 java files executed well.my prob with WikiPageRanking.java
    import WikiPageLinksMapper;
    import WikiMapReducer;
    import Xmlinput;
    this is my code(file names are not exactly correct)
    my errors are
    expecting '.' at import import WikiPageLinksMapper ;
    ^
    expecting ';' at import import WikiPageLinksMapper ;
    ^
    for 3 files import....
    can anyone please help me
    thanx in advance...

    • Alexander Bij - Reply

      March 14, 2013 at 12:47 pm

      Make sure the single-node cluster is of the same Hadoop version (0.20.204.0).
      I should update to code and dependencies to the latest version.

  10. Ricardo - Reply

    January 22, 2014 at 6:10 am

    Great post. I used to be checking continuously this weblog and I'm inspired! Very useful information specifically the last part :) I handle such information much. I was seeking this particular info for a long time. Thank you and good luck.

Add a Comment