Invoke and Merge

ScaleOut Software NamedCache API

In addition to the standard LINQ operators found in Queryable, the ScaleOut LINQ provider also recognizes the operators found in QueryableExtensions which support the use of LINQ queries with StateServer's parallel Invoke facility.

The AsFilter Method

The NamedCache's Invoke and Query methods accept an IFilter parameter to restrict the list of objects the methods operate on or return. The AsFilterT method allows you to convert an IQueryableT result generated by QueryKeysT or QueryObjectsT to an IFilter object suitable for use with these methods.

For example, consider the following classes where a (stock) Portfolio holds a list of Positions in various stocks. Note that the Portfolio class implements ITaggable and that when Portfolio.Add adds a Position to the Portfolio, it also adds the Ticker corresponding to the stock as a Tag for the Portfolio.

 1[Serializable]
 2public struct Position
 3{
 4    public string Ticker { get; set; }
 5    public string Company { get; set; }
 6    public decimal Quantity { get; set; }
 7    public DateTime PurchaseDate { get; set; }
 8    public decimal Cost { get; set; }
 9    public decimal CurrentPrice { get; set; }
10}
11
12[Serializable]
13public class Portfolio : ITaggable
14{
15    public Portfolio(string owner)
16    {
17        Owner = owner;
18        Positions = new List<Position>();
19    }
20
21    [SossIndex]
22    public String Owner { get; private set; }
23
24    public IList<Position> Positions { get; private set; }
25
26    public void Add(Position position)
27    {
28        Positions.Add(position);
29        this.AddTags(position.Ticker);
30    }
31
32    SparseBitmap ITaggable.TagHolder { get; set; }
33    NamedCache ITaggable.NamedCache
34    {
35        get { return CacheFactory.GetCache(CacheName); }
36    }
37}

Suppose we'd like to calculate the value of all holdings of a particular list of stocks. Since Tags may be queried on StateServer, we can construct a LINQ query that returns only those Portfolio instances that contain the Tickers of the stocks we're interested in as shown below.

1HashSet<string> tickersToProcess = GetTickersToProcess();
2
3var q = from pf in cache.QueryObjects<Portfolio>()
4        where pf.HasAnyTag(tickersToProcess)
5        select pf;

Using the AsFilterT method, we can use the query we created above as the IFilter argument as shown here:

 1decimal valueOfSelectedStocks =
 2    cache.Invoke<Portfolio, HashSet<string>, decimal>(
 3        q.AsFilter(),
 4        tickersToProcess,
 5        (pfolio, selectedTickers) =>
 6            (from pos in pfolio.Positions
 7             where selectedTickers.Contains(pos.Ticker)
 8             select pos.Quantity * pos.CurrentPrice).Sum(),
 9         (v1, v2) => v1 + v2,
10         NamedCache.InfiniteInvokeTimeout);
11
12Console.WriteLine("Value of stocks in all portfolios: {0}", 
13                    valueOfSelectedStocks);

We pass along the tickersToProcessHashSetT as the InvokeT, P, Rparam parameter.

In this case, the evalMethod parameter is expressed as a Lambda Function (lines 5-8) which simply finds the Positions within the Portfolio with a Ticker value in the tickersToProcess HashSet, multplies the Position's CurrentPrice and Quantity and sums the result across all selected Positions.

The mergeMethod (line 9) merges the results of evalMethod invocations by summing the results.

By using the LINQ query to filter the objects sent to the evalMethod, we've ensured that only objects of the right type (Position objects) and objects with some position in the selected stocks are evaluated in the evalMethod.

Calling Invoke Using the LINQ Pattern

The StateServer LINQ provider provides an alternate way of expressing Parallel Method Invocation that more closely follows the LINQ programming pattern. The Invoke family of methods allow you to specify the evalMethod for an Invoke operation. If you use an evalMethod that has a non-void return value, then you must also use the MergeR method to specify the mergeMethod.

Note Note

When you use MergeR, it must immediately follow or consume the result from the Invoke method. No other LINQ operators may be used between the Invoke and the Merge.

The example above can be rewritten using the LINQ pattern as follows:

 1HashSet<string> tickersToProcess = GetTickersToProcess();
 2
 3decimal valueOfSelectedStocks =
 4    (from pf in cache.QueryObjects<Portfolio>()
 5     where pf.HasAnyTag(tickersToProcess)
 6     select pf)
 7
 8    .Invoke(NamedCache.InfiniteInvokeTimeout,
 9        tickersToProcess,
10        (pfolio, selectedTickers) =>
11            (from pos in pfolio.Positions
12             where selectedTickers.Contains(pos.Ticker)
13             select pos.Quantity * pos.CurrentPrice).Sum())
14
15    .Merge((v1, v2) => v1 + v2);
16
17Console.WriteLine("Value of stocks in all portfolios: {0}",
18                    valueOfSelectedStocks);
There is no semantic difference between the code using the LINQ pattern and the code using the NamedCache.Invoke method directly. However, since the IFilter method is not strongly typed, the compiler cannot infer the target object type for the Invoke method. In the LINQ pattern case, the compiler can infer the the target object type, can infer the type of the param object and in this case, can also infer the evalMethod's return type. Consequently, we weren't required to specify type parameters for the InvokeT, P, R method.

By leveraging readers' understanding of the LINQ pattern and making the data flow between the query, eval method and merge method more explicit, you may find that the LINQ pattern for Invoke is easier to read and write.

QueryKeys and Invoke

When you use the QueryableExtensions's version of the Invoke method, the generic type parameter of the "target" type parameter T is required to be the same as the type parameter of the IQueryableT generic type used as the data source for Invoke.

When Invoke's data source is QueryObjectsT then the "target" type parameter for Invoke is the same as the generic type parameter specified in QueryObjectsT. However, QueryKeysT returns CachedObjectIdT. That means that the "target" type parameter for Invoke is required to be CachedObjectIdT.

Normally, the parallel method invocation infrastructure expects the "target" type for the evalMethod to refer to an object in the store and the infrastructure arranges to fetch the object from the store and deserialize it before calling the evalMethod. In this version of StateServer, if the target type is CachedObjectIdT, the parallel method invocation infrastructure simply passes the CachedObjectIdT to the evalMethod. The evalMethod can choose to fetch the object from the store or not depending on what it is required to do.

For example, if you wanted to use parallel method invocation to simply count the number of portfolios containing the stock "GOOG", you might execute the following:

 1int countOfStocks = (from key in cache.QueryKeys<Portfolio>()
 2                     where key.Value.HasAnyTag("GOOG")
 3                     select key)
 4
 5    .Invoke(NamedCache.InfiniteInvokeTimeout,
 6        0,    // no parameter, specify zero
 7        (key, param) => 1)
 8
 9    .Merge((v1, v2) => v1 + v2);
10
11Console.WriteLine("Count of portfolios containing the stock GOOG: {0}", 
12                    countOfStocks);
This approach should give you the same results as
1int countOfStocks = (from key in cache.QueryKeys<Portfolio>()
2                     where key.Value.HasAnyTag("GOOG")
3                     select key).Count();
4
5Console.WriteLine("Count of portfolios containing the stock GOOG: {0}", 
6                    countOfStocks);
However, by using parallel method invocation, we do the counting in parallel on the distributed servers, only sending the per-server sum over the network. Since we're only interested in counting the objects, we don't need or want to deserialize the objects for processing in the evalMethod. When using the Count approach, all of the keys satisfying the query are sent to the client where they are merged and counted.

Note Note
In some future release the Count() LINQ operator is likely to be supported directly in the server.

See Also

Reference

Other Resources