NamedMapTKey, TValueRunMapReduceMK, MV, OK, OV Method (NamedMapOK, OV, MapperTKey, TValue, MK, MV, CombinerMK, MV, ReducerMK, MV, OK, OV, TimeSpan)

ScaleOut Software NamedCache API
Provides a simple and fast way to perform a MapReduce operation that does not require Hadoop infrastructure.

Namespace:  Soss.Client.Concurrent
Assembly:  soss_namedcache (in soss_namedcache.dll) Version: 6.2.0.0
Syntax

public bool RunMapReduce<MK, MV, OK, OV>(
	NamedMap<OK, OV> outputMap,
	Mapper<TKey, TValue, MK, MV> mapper,
	Combiner<MK, MV> combiner,
	Reducer<MK, MV, OK, OV> reducer,
	TimeSpan timeout
)

Parameters

outputMap
Type: Soss.Client.ConcurrentNamedMapOK, OV
The output named map instance.
mapper
Type: Soss.Client.Concurrent.MapReduceMapperTKey, TValue, MK, MV
Mapper class that inherits from the MapperIK, IV, MK, MV abstract class and implements the Map abstract method.
combiner
Type: Soss.Client.Concurrent.MapReduceCombinerMK, MV
Combiner class that inherits from the CombinerMK, MV abstract class and implements the Combine abstract method.
reducer
Type: Soss.Client.Concurrent.MapReduceReducerMK, MV, OK, OV
Reducer class that inherits from the ReducerMK, MV, OK, OV abstract class and implements the Reduce abstract method.
timeout
Type: SystemTimeSpan
Timeout for the MapReduce operation

Type Parameters

MK
Intermediate key type, the mapper's key output type.
MV
Intermediate value type, the mapper's value output type.
OK
Output key type of the MapReduce operation.
OV
Output value type of the MapReduce operation.

Return Value

Type: Boolean
true if operation succeeds, otherwise false.
Remarks

The supplied mapper, reducer, and combiner objects must inherit from the corresponding abstract classes: MapperIK, IV, MK, MV, CombinerMK, MV and ReducerMK, MV, OK, OV and be passed as arguments to the method. This method runs the MapReduce operation on the data stored within this NamedMapTKey, TValue class instance and writes the resulting key/values elements into the output named map instance referenced via the outputMap parameter.

By default, the NamedMap uses Microsoft's BinaryFormatter serializer, but for the best performance you may want to consider using serializers with more compact output (for example, serializers that implement the Protocol Buffers or MessagePack serialization format).

Examples

Performing a MapReduce operation using the NamedMap class
using System;
using System.Text;
using System.Reflection;
using System.Diagnostics;
using System.Collections.Generic;

using Soss.Client;
using Soss.Client.Concurrent;
using Soss.Client.Concurrent.MapReduce;

/// <summary>
/// This sample illustrates how to run a MapReduce operation using the NamedMap API. 
/// The NamedMap API offers two RunMapReduce method signatures: 
/// 1) the first one is an instance-based method which runs the MapReduce task using locally stored key/value pairs;
/// 2) the second one is a static method that gets the input named map data collection and outputs reduced values
/// into a separate named map.
/// 
/// This sample illustrates use of an instance-based signature. It implements a classic 
/// word count algorithm: it randomly generates "words", counts how many unique words 
/// were generated, and verifies the output using a simple query.
/// </summary>
class MapReduceUsage_NotStatic
{
    static void Main(string[] args)
    {
        NamedMap<int, string> inputMap = null;
        NamedMap<string, int> outputMap = null;
        InvocationGrid grid = null;
        InvocationGridBuilder igBuilder = null;

        int numberOfTextLines = 100;
        int numberOfWordsPerLine = 10;

        // Setup an invocation grid to run the MapReduce operation
        igBuilder = new InvocationGridBuilder("GridForMap");
        igBuilder.AddDependency(Assembly.GetExecutingAssembly());
        igBuilder.LingerTime = TimeSpan.FromSeconds(600);
        grid = igBuilder.Load();

        // Create an instance of the input NamedMap
        inputMap = new NamedMap<int, string>("Test_WordCount_Input");
        inputMap.InvocationGrid = grid;
        inputMap.Clear();

        Console.WriteLine("Populating the input map, adding {0} words", numberOfTextLines * numberOfWordsPerLine);

        BulkLoader<int, string> loader = inputMap.CreateBulkLoader();
        for (int row = 0; row < numberOfTextLines; row++)
            loader.Put(row, GenerateTextLine(numberOfWordsPerLine));
        loader.Close();

        Console.WriteLine("Finished creating the input named map. Running the MapReduce operation...\n");

        outputMap = new NamedMap<string, int>("Test_WordCount_Ouput");
        outputMap.InvocationGrid = grid;

        // The following three classes implement required methods of the corresponding 
        // public Mapper, Combiner and Reducer abstract classes:
        TestWordCountMapper maprMapper = new TestWordCountMapper();
        TestWordCountCombiner maprCombiner = new TestWordCountCombiner();
        TestWordCountReducer maprReducer = new TestWordCountReducer();

        // Run MapReduce
        inputMap.RunMapReduce<string, int, string, int>(outputMap, maprMapper, maprCombiner, maprReducer, TimeSpan.FromSeconds(30));

        // 
        // Query and output the results:
        // 
        IEnumerable<string> keys = outputMap.ExecuteParallelQuery(new SimpleMRQueryCondition());

        // Loop through the results and count the words:
        int queryResultCount = 0;
        int numberOfWords = 0;

        int totalNumber = outputMap.ExecuteCount();

        // Validation
        foreach (string key in keys)
        {
            outputMap.TryGetValue(key, out numberOfWords);
            Console.WriteLine(string.Format("Word: {0}, Count: {1}", key, numberOfWords));
            queryResultCount++;
        }
        Debug.Assert(queryResultCount == totalNumber);

        Console.WriteLine("______________________________________");
        Console.WriteLine(string.Format("Total number of unique words: {0}", totalNumber));
        if (inputMap != null)
            inputMap.Clear();
        if (outputMap != null)
            outputMap.Clear();
        if (grid != null)
            grid.Unload();

        Console.WriteLine("TestWordCount is completed. Press Enter to finish the program.");
        Console.ReadLine();
    } // Main

    public static string GenerateTextLine(int numOfWordsPerRow)
    {
        string alphabet = "abcdefghijklmnopqrstuvwxyz";
        int alphabetSize = alphabet.Length;
        Random random = new Random();

        StringBuilder builder = new StringBuilder();

        // Generate 4 letter words:
        for (int wordPerRow = 1; wordPerRow <= numOfWordsPerRow; wordPerRow++)
        {
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));

            if (wordPerRow > 0 && wordPerRow % numOfWordsPerRow != 0)
                builder.Append(" ");
        }
        return builder.ToString();
    }
}

[Serializable]
public class TestWordCountMapper : Mapper<int, string, string, int>
{
    const int SPACE = 0x20;

    /// <summary>
    /// Public constructor.
    /// </summary>
    public TestWordCountMapper() { }

    /// <summary>
    /// Performs the map operation for a key-value pair. 
    /// </summary>
    /// <param name="key">Unique word identifier.</param>
    /// <param name="value">Generated word.</param>
    /// <param name="context">Context for emitting intermediate key-value pairs.</param>
    public override void Map(int key, string value, IContext<string, int> context)
    {
        byte[] buf = Encoding.ASCII.GetBytes(value);
        int length = buf.Length;
        int start = 0, cur = 0;
        int SPACE = 0x20;

        // Parsing the line of words
        while (start < length)
        {
            for (cur = start; cur < length; cur++)
            {
                if (buf[cur] == SPACE)
                    break;
            }

            context.Emit(Encoding.ASCII.GetString(buf, start, cur - start), 1);
            start = cur + 1;
        }
    }
} // class TestWordCountMapper

[Serializable]
public class TestWordCountCombiner : Combiner<string, int>
{
    /// <summary>
    /// Combines multiple key's values into a single value. 
    /// </summary>
    /// <param name="key">A word from the generated text block.</param>
    /// <param name="values">Each value represents a number of time
    /// this word appeared within the text block parsed by
    /// each MapReduce split.</param>
    /// <returns>Calculated total value.</returns>
    public override int Combine(string key, IEnumerable<int> values)
    {
        int sum = 0;

        foreach (int val in values)
            sum += val;

        return sum;
    }
} // class TestWordCountCombiner

[Serializable]
public class TestWordCountReducer : Reducer<string, int, string, int>
{
    /// <summary>
    /// Performs the reduce step for each word as a key and the list of its values, which is the number of 
    /// word's occurances counted within each split.
    /// </summary>
    /// <param name="key">Generated word.</param>
    /// <param name="values">The number of times this word occurred in the text so far.</param>
    /// <param name="context">Context for emitting output key-value pairs.</param>
    public override void Reduce(string key, IEnumerable<int> values, IContext<string, int> context)
    {
        int sum = 0;

        foreach (int val in values)
            sum += val;

        context.Emit(key, sum);
    }

} // class TestWordCountReducer

/// <summary>
/// Class representing a query condition to be used in a 
/// NamedMap.ExecuteParallelQuery() call. 
/// This class must be parameterized identically to the output NamedMap.
/// </summary>
[Serializable]
public class SimpleMRQueryCondition : QueryCondition<string, int>
{
    /// <summary>
    /// Default implementation is to return all words. This method can be modified to return a partial match,
    /// e.g., only return words that begin with "a".
    /// </summary>
    /// <param name="word">Key to the string object in the NamedMap representing a word.</param>
    /// <param name="numWordOccurrences">The number of times this word occurs in the input map.</param>
    /// <returns>true if the KV pair is to be included in the query result, otherwise false</returns>
    public override bool CheckCondition(string word, int numWordOccurrences)
    {
        // do nothing
        return true;
    }
}
See Also

Reference