Stream Processing with PostEvent and ReactiveX

ScaleOut Software NamedCache API

This topic contains the following sections:

When objects stored in the ScaleOut distributed data grid are large or subject to more frequent updates than reads, it can be advantageous to use an event-based update model rather than a conventional CRUD (Create/Read/Update/Delete) access model. Sending events directly to the ScaleOut hosts where objects reside is often more efficient than pulling an entire object to a client, modifying it, and then sending the entire object back to the StateServer host via an update.

ScaleOut's StreamServer product introduces the PostEvent operation, a method that allows messages to be sent directly to the server where an object resides. This enables a stream-oriented approach to handling incoming events: once the event reaches the server where an object resides, it can be analyzed and/or persisted in the associated StateServer object.

On the server side, custom code that receives the event for analysis and storage can take advantage of ReactiveX operators to create sophisticated event handling pipelines and use time windowing functions to perform advanced analysis of an object's event stream.

Tip Tip

A runnable version of code in this article can be found in the Visual Studio solution shipped in the ScaleOut installation package. The solution is located in the samples folder under your ScaleOut installation directory.

Posting an Event

Posting an event to the ScaleOut data grid involves calling the PostEvent method. The method takes the following parameters:

id

The key to the persisted object in ScaleOut service that is associated with the event. To minimize network overhead, the event will be sent directly to the ScaleOut host that contains this persisted state object. For example, if the ScaleOut service is storing stock price history objects and we are posting a price change to a stock, the ID argument here could be the ticker symbol of the stock history object associated with the price change.

eventInfo

An optional, arbitrary string associated with the event. This string is typically used to identify the type of event being posted if more than one kind of event is being sent to the data grid. However, if the the event can be entirely described by a string (say, as a JSON-serialized object) then the eventInfo string can serve as the payload of the event.

payload

An optional, arbitrary byte array containing the payload of the object (typically a serialized object).

invokeTimeout

The amount of time allowed for the PostEvent operation to complete on the server.

Exceptions should not be thrown from server-side event handling code, but, in case an exception does escape from your event handler, it will be serialized and shipped back to the PostEvent caller, where it will be wrapped and thrown as a MethodInvocationException.

Calling the NamedCache's PostEvent method
static void Main(string[] args)
{
    NamedCache nc = CacheFactory.GetCache("price histories");
    ConfigureInvocationGrid(nc); // discussed below

    // Simulate an incoming stock quote:
    var quote = new StockQuote()
    {
        Ticker = "MSFT",
        Timestamp = DateTimeOffset.Parse("12/21/2016 17:59 -5:00"),
        Price = 63.54m,
        Volume = 15253897
    };

    // Ticker symbols are the keys to history objects in this named cache.
    // Use it as the key to the event to have it raised on the ScaleOut 
    // host where the MSFT history resides:
    var cacheKey = nc.CreateKey(quote.Ticker);

    // Send the quote to the Invocation Grid for processing:
    nc.PostEvent(id: cacheKey,
                  eventInfo: "Quote Event",
                  payload: quote.ToBytes());
}
Handling Event Streams in the ScaleOut In-Memory Data Grid

This section contains the following subsections:

The arguments provided by the client to the PostEvent method are delivered to your server-side event handling code in a Soss.Client.Event object. This section covers how to host your server-side code and efficiently process these events.

Hosting Server-Side Code

Before posting an event, server-side event handling code must be launched on the system(s) hosting the ScaleOut service. At startup, this event handling app must subscribe to the event source (by calling Subscribe on the IObservable returned by GetEventSource) and then remain running indefinitely to process the incoming events.

The ScaleOut service load balances the delivery of events to all hosts in your cluster, so the event handling process must be started and running on all ScaleOut hosts. There are two approaches to hosting your event handling logic:

  1. Write a long-running Windows Service that subscribes to the named cache's event source at startup and remains running indefinitely. See MSDN for details on how to create a Windows Service project. The service must be deployed and started on every ScaleOut host in your cluster.

  2. Use ScaleOut's Invocation Grid feature to deploy and host your event handling code in a .NET worker process that is launched and managed by the ScaleOut service. Clients are responsible for configuring the invocation grid prior to posting events. This involves associating a NamedCache instance with an invocation grid, specifying dependent assemblies to be shipped, and passing in a startup method that will be executed by the invocation grid worker processes when it is first loaded.

    The startup Action delegate that you provide to the Invocation Grid Builder's Initializer property should be used to subscribe to the event source provider that is returned from GetEventSource. This is where you can set up an Rx pipeline to handle any/all events that will be posted by clients.

    Launching an Invocation Grid from a Client
    // Sets up and start an "Invocation Grid" (.NET worker processes 
    // on ScaleOut hosts) to handle posted events.
    static void ConfigureInvocationGrid(NamedCache nc)
    {
        var igBuilder = new InvocationGridBuilder("Quote handler worker grid");
    
        // Specify the assembly containing our PriceChangeObserver
        // class to be shipped to worker processes on the server(s):
        igBuilder.AddDependency(System.Reflection.Assembly.GetExecutingAssembly());
    
        // Specify the setup routine to be run when the worker processes 
        // first start up on the farm. This routine is responsible for 
        // subscribing to the observable event source we're pushing to.
        igBuilder.Initializer = PriceChangeHandler.Setup;
    
        // Load() is idempotent and can be called safely from multiple client instances.
        var ig = igBuilder.Load();
    
        // Associate the invocation grid with our named cache of 
        // stock price histories. This causes NamedCache.PostEvent()
        // calls to be processed in the IG worker process:
        nc.InvocationGrid = ig;
    }
    Note Note

    It is safe for multiple client instances to load an invocation grid--if the grid is already loaded and running on the ScaleOut hosts then the call is ignored.

Handling the Event

The arguments provided by the client to the PostEvent method are delivered to server-side event handling code in a Soss.Client.Event object. Once subscribed to the observable event source, the event handling pipeline in server-side code is provided a stream of these Event objects. Pipelines generally consist of the following steps:

  1. Route the event as needed, typically using the EventInfo property to discriminate different types of events.

  2. Deserialize an event object in Payload, if one exists.

  3. Retrieve the object associated with the event from the local ScaleOut service.

  4. Perform any analysis and modify the state of the associated object.

  5. Persist the modified state back to ScaleOut service.

The following sections will cover these steps in depth.

Subscribing and Routing

The extension methods in the System.Reactive.Linq NuGet package offer operators that can be used to subscribe and route events emitted from the observable source returned from NamedCache.GetEventSource). In the following example, Rx operators are used to set up a pipeline that performs the following actions:

  1. The Where() method filters events so that only "Quote Event" messages are routed to our handler.

  2. The Select() method is used to transform the payload in the event from a serialized byte array into an instance of a StockQuote.

  3. The Subscribe() method is used to hook up our HandlePriceChangeEvent method, contains our analysis logic (and is defined later in this article).

Setting up an Event Handling Pipeline
// Named cache used to access price history objects in the Scaleout service.
static readonly NamedCache _nc = CacheFactory.GetCache("price histories");

// Initialization method to run when a distributed invocation grid
// is first loaded.
public static void Setup()
{
    // Set up a ReactiveX pipeline to deserialize incoming 
    // stock quote events and push them into our HandlePriceChangeEvent
    // method. See http://reactivex.io/.

    // GetEventSource() returns an IObservable<Soss.Client.Event>:
    _nc.GetEventSource()
        // This subscriber is only interested in stock quote events:
        .Where(ev => ev.EventInfo == "Quote Event")
        // Extract the price quote from the event's payload:
        .Select(ev => StockQuote.FromBytes(ev.Payload))
        // Append to stock's price history and do analysis:
        .Subscribe(HandlePriceChangeEvent);
}
Note Note

The method in the code listing above should be executed either when your server process starts or should be provided to the InvocationGridBuilder.Load property as a startup action.

Accessing Persisted State

The NamedCache.Retrieve method can be used to retrieve the persisted state associated with the event that will be used during the analysis of the incoming event. In the example below, we use a RetrieveOptions argument to transparently create the persisted stock history object in the ScaleOut service if one doesn't already exist for the ticker symbol:

Setting up an Event Handling Pipeline
// Helper method to read the stock history from the ScaleOut
// service, locking it.
static LinkedList<StockQuote> RetrieveHistory(string tickerSymbol)
{
    // Set up retrieval options to create a new history collection
    // transparently in the ScaleOut service if one doesn't already exist:
    var opt = new RetrieveOptions(lockingMode: ReadLockingMode.LockOnRead,
                                  geoServerReadMode: GeoServerReadMode.None,
                                  createHandler: (key, arg) => new LinkedList<StockQuote>(),
                                  createArgument: null,
                                  createPolicy: _nc.DefaultCreatePolicy);

    return _nc.Retrieve(tickerSymbol, opt) as LinkedList<StockQuote>;
}
Note Note

The object holding the persisted state is exclusively locked in the ScaleOut service until analysis and updating of the state is complete.

Analysis and Updating State

Analysis of incoming events can involve any custom logic needed to address the problem domain. In the example below, the incoming stock quote event is added to a stock's history, and then checked for a sudden price drop.

Setting up an Event Handling Pipeline
// Adds incoming quote to a stock's history,
// raising an alert if needed.
static void HandlePriceChangeEvent(StockQuote stockQuote)
{
    // Retrieve the price history associated with this stock:
    LinkedList<StockQuote> history = RetrieveHistory(stockQuote.Ticker);

    // This is a convenient location to run analysis.
    // For example, raise alert for precipitous price drops:
    if (history.Count > 1 && stockQuote.Price / history.Last.Value.Price < 0.8m)
        Trace.WriteLine("Warning: price drop exceeds 20%.");

    // Add the incoming quote to the history:
    history.AddLast(stockQuote);

    // Trim the history to 100 of the most recent quotes:
    if (history.Count > 100) history.RemoveFirst();

    // Persist updated history to ScaleOut service:
    _nc.Update(stockQuote.Ticker, history, unlockAfterUpdate: true);
}

In streaming applications, processing events frequently involves transforming the events with a time windowing function and then applying an aggregate function to each window. (For example, moving averages are often used in the technical analysis of equities.) ScaleOut Software provides an open source time windowing library that is well suited for the stateful event processing model supported by ScaleOut StreamServer.

Once analysis is complete, your pipeline is responsible for updating persisted state that's stored in the local ScaleOut service. This can be accomplished with a call to NamedCache.Update. Note that the update call also releases the exclusive lock on the persisted stock history object in the ScaleOut service.