NamedMapTKey, TValueRunMapReduceMK, MV, OK, OV Method (NamedMapTKey, TValue, NamedMapOK, OV, MapperTKey, TValue, MK, MV, CombinerMK, MV, ReducerMK, MV, OK, OV, TimeSpan)

ScaleOut Software NamedCache API
Provides a simple and fast way to perform a MapReduce operation that does not require Hadoop infrastructure.

Namespace:  Soss.Client.Concurrent
Assembly:  soss_namedcache (in soss_namedcache.dll) Version: 6.1.0.0
Syntax

public static bool RunMapReduce<MK, MV, OK, OV>(
	NamedMap<TKey, TValue> inputMap,
	NamedMap<OK, OV> outputMap,
	Mapper<TKey, TValue, MK, MV> mapper,
	Combiner<MK, MV> combiner,
	Reducer<MK, MV, OK, OV> reducer,
	TimeSpan timeout
)

Parameters

inputMap
Type: Soss.Client.ConcurrentNamedMapTKey, TValue
The input named map instance.
outputMap
Type: Soss.Client.ConcurrentNamedMapOK, OV
The output named map instance.
mapper
Type: Soss.Client.Concurrent.MapReduceMapperTKey, TValue, MK, MV
Mapper class that inherits from the MapperIK, IV, MK, MV abstract class and implements the Map abstract method.
combiner
Type: Soss.Client.Concurrent.MapReduceCombinerMK, MV
Combiner class that inherits from the CombinerMK, MV abstract class and implements the Combine abstract method.
reducer
Type: Soss.Client.Concurrent.MapReduceReducerMK, MV, OK, OV
Reducer class that inherits from the ReducerMK, MV, OK, OV abstract class and implements the Reduce abstract method.
timeout
Type: SystemTimeSpan
Timeout for the MapReduce operation

Type Parameters

MK
Intermediate key type, the mapper's key output type.
MV
Intermediate value type, the mapper's value output type.
OK
Output key type of the MapReduce operation.
OV
Output value type of the MapReduce operation.

Return Value

Type: Boolean
true if operation succeeds, otheriwse false.
Remarks

The supplied mapper, reducer, and combiner objects must inherit from the corresponding abstract classes: MapperIK, IV, MK, MV, CombinerMK, MV and ReducerMK, MV, OK, OV and be passed as arguments to the method. This static method runs the MapReduce operation on the data passed via the inputMap parameter and writes the resulting key/values elements into the output named map instance referenced via the outputMap parameter.

By default, the NamedMap uses Microsoft's BinaryFormatter serializer, but for the best performance you may want to consider using serializers with more compact output (for example, serializers that implement the Protocol Buffers or MessagePack serialization format).

Examples

Performing a MapReduce operation using the NamedMap class
using System;
using System.Text;
using System.Reflection;
using System.Diagnostics;
using System.Collections.Generic;

using Soss.Client;
using Soss.Client.Concurrent;
using Soss.Client.Concurrent.MapReduce;

/// <summary>
/// This sample illustrates how to run a MapReduce operation using the NamedMap API, 
/// that offers two signatures of the RunMapReduce method: the first one is an instance based, 
/// allowing to run the MapReduce task using locally stored key/value pairs; the second one 
/// represents a static method that gets the input named map data collection and outputs reduced values
/// into a separate named map.
/// 
/// This sample illustrates use of static RunMapReduce's method signatuire. It implements a classic 
/// word count algorhithm: first it generates rows of words and then counts of how many unique words were
/// generated. Finally it uses a simple query to check the validity of the output.
/// </summary>
class MapReduceUsage_Static
{
    static void Main(string[] args)
    {
        NamedMap<int, string> inputMap = null;
        NamedMap<string, int> outputMap = null;
        InvocationGrid grid = null;
        InvocationGridBuilder igBuilder = null;

        int numberOfTextLines = 100;
        int numberOfWordsPerLine = 10;

        igBuilder = new InvocationGridBuilder("GridForMap");
        igBuilder.AddDependency(Assembly.GetExecutingAssembly());
        igBuilder.LingerTime = TimeSpan.FromSeconds(600);
        grid = igBuilder.Load();

        inputMap = new NamedMap<int, string>("Test_WordCount_Input");
        inputMap.ParallelOperationTransport = ParallelOperationTransport.UseSockets;
        inputMap.AutoCorrectStreamPosition = AutoCorrectStreamPosition.Always;
        inputMap.InvocationGrid = grid;
        inputMap.Clear();

        Console.WriteLine("Populating the input map, adding {0} words", numberOfTextLines * numberOfWordsPerLine);

        BulkLoader<int, string> loader = inputMap.CreateBulkLoader();
        for (int row = 0; row < numberOfTextLines; row++)
            loader.Put(row, GenerateTextLine(numberOfWordsPerLine));
        loader.Close();

        Console.WriteLine("Finished creating the input named map. Running the MapReduce operation...\n");

        outputMap = new NamedMap<string, int>("Test_WordCount_Ouput");
        outputMap.InvocationGrid = grid;

        // The TestWordCountMapper, TestWordCountCombiner and TestWordCountReducer classes 
        // implement required methods of the corresponding public Mapper, Combiner and Reducer abstract classes:
        bool bRet = NamedMap<int, string>.RunMapReduce<string, int, string, int>(inputMap, outputMap, new TestWordCountMapper(), new TestWordCountCombiner(), new TestWordCountReducer(), SossTimeout.InfiniteTimeSpan);

        // 
        // Query and output the results:
        // 
        IEnumerable<string> keys = outputMap.ExecuteParallelQuery(new SimpleMRQueryCondition());

        // Loop through the results and count the words:
        int queryResultCount = 0;
        int numberOfWords = 0;

        int totalNumber = outputMap.ExecuteCount();

        // Validation
        foreach (string key in keys)
        {
            outputMap.TryGetValue(key, out numberOfWords);
            Console.WriteLine(string.Format("Word: {0}, Count: {1}", key, numberOfWords));
            queryResultCount++;
        }
        Debug.Assert(queryResultCount == totalNumber);

        Console.WriteLine("______________________________________");
        Console.WriteLine(string.Format("Total number of unique words: {0}", totalNumber));
        if (inputMap != null)
            inputMap.Clear();
        if (outputMap != null)
            outputMap.Clear();
        if (grid != null)
            grid.Unload();

        Console.WriteLine("TestWordCount is completed. Press Enter to finish the program.");
        Console.ReadLine();
    } // Main

    public static string GenerateTextLine(int numOfWordsPerRow)
    {
        string alphabet = "abcdefghijklmnopqrstuvwxyz";
        int alphabetSize = alphabet.Length;
        Random random = new Random();

        StringBuilder builder = new StringBuilder();

        // Generate 4 letter words:
        for (int wordPerRow = 1; wordPerRow <= numOfWordsPerRow; wordPerRow++)
        {
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));
            builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1));

            if (wordPerRow > 0 && wordPerRow % numOfWordsPerRow != 0)
                builder.Append(" ");
        }
        return builder.ToString();
    }
}

[Serializable]
public class TestWordCountMapper : Mapper<int, string, string, int>
{
    public TestWordCountMapper() { }

    public override void Map(int key, string value, IContext<string, int> context)
    {
        byte[] buf = Encoding.ASCII.GetBytes(value);
        int length = buf.Length;
        int start = 0, cur = 0;
        int SPACE = 0x20;

        // Parsing the line of words
        while (start < length)
        {
            for (cur = start; cur < length; cur++)
            {
                if (buf[cur] == SPACE)
                    break;
            }

            context.Emit(Encoding.ASCII.GetString(buf, start, cur - start), 1);
            start = cur + 1;
        }
    }
} // class TestWordCountMapper

[Serializable]
public class TestWordCountCombiner : Combiner<string, int>
{
    public TestWordCountCombiner() { }

    public override int Combine(string key, IEnumerable<int> values)
    {
        int sum = 0;

        foreach (int val in values)
            sum += val;

        return sum;
    }
} // class TestWordCountCombiner

[Serializable]
public class TestWordCountReducer : Reducer<string, int, string, int>
{
    public TestWordCountReducer() { }

    public override void Reduce(string key, IEnumerable<int> values, IContext<string, int> context)
    {
        int sum = 0;

        foreach (int val in values)
            sum += val;

        context.Emit(key, sum);
    }
} // class TestWordCountReducer

/// <summary>
/// Class representing a query condition to be used in a 
/// NamedMap.ExecuteParallelQuery() call. 
/// </summary>
[Serializable]
public class SimpleMRQueryCondition : QueryCondition<string, int>
{
    /// <summary>
    /// CheckCondition override.
    /// </summary>
    /// <param name="word">Key to the string object in the NamedMap representing a word.</param>
    /// <param name="numberOfWords">The number of times this word was found in the first input map.</param>
    /// <returns>true if the condition is satisfied, otherwise false</returns>
    public override bool CheckCondition(string word, int numberOfWords)
    {
        // do nothing
        return true;
    }
}
See Also

Reference