Sample Program: Modified WordCount example

The following program is a modified WordCount example which illustrates many of the approaches outlined in previous sections:

  • It uses the NamedMap as input and output for a ScaleOut hServer job,
  • It implements a custom Writable serialization, and
  • It performs a parallel query of the output map to determine which words are used more frequently than a provided threshold.
package com.scaleoutsoftware.soss.hserver.examples;

import com.scaleoutsoftware.soss.client.InvocationGrid;
import com.scaleoutsoftware.soss.client.InvokeException;
import com.scaleoutsoftware.soss.client.map.NamedMap;
import com.scaleoutsoftware.soss.client.map.NamedMapFactory;
import com.scaleoutsoftware.soss.client.map.QueryCondition;
import com.scaleoutsoftware.soss.hserver.GridOutputFormat;
import com.scaleoutsoftware.soss.hserver.HServerJob;
import com.scaleoutsoftware.soss.hserver.NamedMapInputFormat;
import com.scaleoutsoftware.soss.hserver.WritableSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;


public class NamedMapWordCount {
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: wordcount <input map> <output map> <threshold>");
            System.exit(2);
        }

        final int threshold = new Integer(otherArgs[2]);

        //Create named maps
        NamedMap<IntWritable, Text> inputMap = NamedMapFactory.getMap(otherArgs[0],
                new WritableSerializer<IntWritable>(IntWritable.class),
                new WritableSerializer<Text>(Text.class));

        NamedMap<Text, IntWritable> outputMap = NamedMapFactory.getMap(otherArgs[1],
                new WritableSerializer<Text>(Text.class),
                new WritableSerializer<IntWritable>(IntWritable.class));
        outputMap.clear(); //clear output map

        //Create the invocation grid
        InvocationGrid grid = HServerJob.getInvocationGridBuilder("WordCountIG").
                addJar("myjob.jar").
                load();

        //Create hServer job
        Job job = new HServerJob(conf, "word count", false, grid);
        job.setJarByClass(NamedMapWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(NamedMapInputFormat.class);
        job.setOutputFormatClass(GridOutputFormat.class);

        //Set named maps as input and output
        NamedMapInputFormat.setNamedMap(job, inputMap);
        GridOutputFormat.setNamedMap(job, outputMap);

        //Execute job
        job.waitForCompletion(true);

        //Assign invocation grid to the map, so parallel operation can be performed
        outputMap.setInvocationGrid(grid);

        //Run query to find words that are used more than threshold frequency
        Iterable<Text> words = outputMap.executeParallelQuery(new UsageFrequencyCondition(threshold));

        //Unload the invocation grid
        grid.unload();

        //Output resulting words and their frequencies
        System.out.println("Following words were used more than " + threshold + " times:");
        for(Text word : words)
        {
            System.out.println("\""+word.toString()+"\" was used " + outputMap.get(word) + " times.");
        }
    }

    //Implementation of the query condition. Condition is true if
    //the usage frequency exceeds threshold frequency
    static class UsageFrequencyCondition implements QueryCondition<Text, IntWritable>
    {
        private int frequency;

        UsageFrequencyCondition(int frequency) {
            this.frequency = frequency;
        }

        @Override
        public boolean check(Text key, IntWritable value) throws InvokeException {
            return value.get() > frequency;
        }
    }
}