Distributed Aggregation with ForEach

ScaleOut Software NamedCache API
Usage Overview

Consider the following object model of stock equities and their price histories:

Equities and price histories
[Serializable]
public class Stock
{
    public string Ticker { get; }

    [SossIndex(HashIndexPriority.HighPriorityHashable)]
    public string Exchange { get; }

    public IDictionary<DateTime, ClosingPrice> ClosingPrices { get; }

    public Stock(string ticker, string exchange)
    {
        Ticker = ticker;
        Exchange = exchange;
        ClosingPrices = new Dictionary<DateTime, ClosingPrice>(365);
    }
}

[Serializable]
public class ClosingPrice
{
    public decimal Price { get; }
    public long Volume { get; }

    public ClosingPrice(decimal price, long volume)
    {
        Price = price;
        Volume = volume;
    }
}

(You may notice that we've marked the Stock.Exchange property with a [SossIndex] attribute—we'll cover this in a later section.)

Instances of the stock class can be stored in a distributed ScaleOut NamedCache collection using ordinary CRUD-style operations:

Loading the named cache
NamedCache nc = CacheFactory.GetCache("Price Histories");

Stock msftHistory = new Stock("MSFT", "NYSE");
msftHistory.ClosingPrices.Add(DateTime.Today, new ClosingPrice(51.02M, 22862552));
msftHistory.ClosingPrices.Add(DateTime.Today.AddDays(-1), new ClosingPrice(50.07M, 17844600));
//...

nc.Add("MSFT", msftHistory);

Stock orclHistory = new Stock("ORCL", "NYSE");
// ... load 4000 more equities with histories

Once we've loaded the all of the US publicly-traded companies and their histories into the distributed ScaleOut data grid, the ForEach operator can be used to perform arbitrarily complex analysis of the objects in parallel across the farm.

For example, to find the ticker symbols of all stocks whose price increased more than 1% today from yesterday's closing price, the ForEach operator could be used to evaluate all of the stock objects across the server farm in parallel:

Executing a distributed ForEach operation
// Select stocks from the ScaleOut distributed data grid to evaluate. We 
// could use a where clause here to run an initial filter on the 
// involved stocks, if desired.
var stocks = from stock in nc.QueryObjects<Stock>()
             where stock.Exchange == "NYSE"
             select stock;

// Run ForEach against the query result to create a list of 
// stocks that gained over 1% from yesterday's closing price.
decimal minPriceIncreasePct = 1.00M;

var winners = stocks.ForEach<Stock, decimal, List<string>>(
    param: minPriceIncreasePct,
    localInit: () => new List<string>(),

    body: (stock, minPct, threadlocalResult) => {
        decimal todaysPrice = stock.ClosingPrices[DateTime.Today].Price;
        decimal yesterdaysPrice = stock.ClosingPrices[DateTime.Today.AddDays(-1)].Price;
        decimal priceChange = todaysPrice - yesterdaysPrice;

        if ((priceChange / yesterdaysPrice) * 100 > minPct)
            threadlocalResult.Add(stock.Ticker);

        return threadlocalResult;
    },

    merge: (result1, result2) => {
        result1.AddRange(result2);
        return result1;
    }
);

foreach (var ticker in winners)
    Console.WriteLine(ticker);

When the ForEach method is called, ScaleOut's compute engine ships your supplied body and merge logic to compute nodes in the data store and runs the operation in parallel across all hosts in the farm.

Concepts

.NET developers are accustomed to using the TPL's Parallel.ForEach method to harness all the cores on a system for data parallel workloads. However, when workloads are too large or too CPU-intensive for a single machine, developers are forced to move to elaborate distributed computing platforms, losing the simplicity, safety, and productivity of the TPL's intuitive programming model.

ScaleOut Software's distributed in-memory computing platform provides a LINQ ForEach operator that's modeled after the TPL's ForEach method, allowing this familiar programming model to be applied to a highly-scalable distributed computing platform.

ScaleOut's ForEach method operates on a distributed collection that transparently spreads its elements across a cluster of servers running the ScaleOut service. ScaleOut's in-memory compute engine ships assemblies containing the loop's body and aggregation logic out to all machines in the cluster and then executes the work on all of the systems in parallel.

Thread-Local Results

To minimize synchronization overhead while executing the body of the ForEach call, every thread on every host on the farm uses a thread-local result object that is then passed to the body method.

Instances of a thread-local result object are created by the localInit argument, a user-supplied delegate that is responsible for initializing a worker thread's result object.

The result of the localInit routine is passed to the first iteration of the body method that is executed by that thread, and the result of that iteration is passed into the next body iteration, and so on, until the body logic has been executed against all objects being evaluated by the thread.

In the preceding example, the localInit routine simply instantiates a new, empty list of strings for a worker thread to use for collecting ticker symbols.

Merging Results

The TPL's Parallel.ForEach pattern for aggregation uses a "localFinally" delegate to combine thread-local results into a single result for the caller. Parallel.ForEach users typically rely on lambda capture to access and modify the final result in the localFinally method.

In a distributed environment, there is no single, global result value that can be efficiently accessed by all threads across the server farm, so ScaleOut's ForEach operator uses a subtly different aggregation model than the TPL.

Instead of a localFinally delegate that takes a single thread-local argument, ScaleOut's ForEach operator uses a "merge" delegate that takes two thread-local results and returns a single, combined result. This user-supplied merge method is similar in spirit to a localFinally method: it implements the logic for aggregation across threads and hosts, but instead of combining results into one global result value, a merge method must combine two arguments and return a single value.

ScaleOut's compute engine takes care of calling the merge method as needed. Behind the scenes, the engine combines results across threads and servers in parallel using a built-in binary merge tree to minimize data transfer over the network.

Parameters

The TPL's Parallel.ForEach pattern relies on lambda capture to access any external, outer values that are needed in the body of the ForEach operation. However, lambda capture can be complex and problematic in distributed computing because the outer value would need be serialized and shipped to compute nodes. To simplify the model and make the operation's parameters explicitly clear, the ForEach method accepts a param argument.

The supplied param value is serialized and shipped to all compute nodes in the ScaleOut data grid. The compute engine then supplies this value as an argument into your body logic.

In the preceding example, the param argument is a minimum percentage gain for an equity's closing price. The example's body logic uses this value as the basis for deciding whether to include a stock in the result.

Filtering Participating Objects

The TPL's Parallel.ForEach pattern runs your body logic against all elements in an enumerable collection. ScaleOut's distributed ForEach operator, however, allows you to be more selective about the objects that you evaluate.

Every object involved in a distributed ForEach operation needs to be deserialized by the compute engine's worker process prior to being passed into your body logic. Avoiding unnecessary deserialization with a LINQ where clause will improve performance of the ForEach operation.

For example, in the preceding example, if we wanted to limit the selection so that it only evaluated objects in the New York Stock Exchange, we could refine our initial selection criteria with a LINQ where clause:

Filtering objects prior to a ForEach operation
var nyseStocks = from stock in nc.QueryObjects<Stock>()
                 where stock.Exchange == "NYSE"
                 select stock;

The ScaleOut service would then use the its internal index of the Stock.Exchange property to efficiently filter the stocks being evaluated. (The [SossIndex] attribute that we placed on the Stock.Exchange property is how we instructed the ScaleOut service to index our Stock objects.)

Performance Tips

  • Consider using custom serialization. By default, the ScaleOut APIs use .NET’s standard BinaryFormatter to serialize your cached objects. While convenient, the BinaryFormatter may not offer the best performance for your object types. The NamedCacheSetCustomSerialization method allows you to plug in custom serialization/deserialization methods.

  • Use a type other than strings as keys to objects in a ScaleOut NamedCache. String keys can be a convenient way to identify your objects in the in-memory data grid, but, behind the scenes, string keys must be encoded, hashed, and sent to the ScaleOut service for storage. Byte arrays (up to 32 bytes) or Guids are more efficient as keys than arbitrary-length strings.

  • Using a large client cache allows ScaleOut's worker processes to perform aggressive in-process caching to minimize serialization overhead.

See Also

Reference