Posting an Event

Posting an event to the ScaleOut data grid involves calling the postEvent method. The method takes the following parameters:

  • key: The key to the object in ScaleOut service that is associated with the event. To minimize network overhead, the event will be sent directly to the ScaleOut host that contains this state object. For example, if the ScaleOut service is storing stock price history objects and we are posting a price change to a stock, the ID argument here could be the ticker symbol of the stock history object associated with the price change.

  • eventInfo: An optional, arbitrary string associated with the event. This string is typically used to identify the type of event being posted if more than one kind of event is sent to and handled by the data grid. However, if the event can be entirely described by a string (say, as a JSON-serialized object) then the eventInfo string can serve as the payload of the event.

  • payload: An optional, arbitrary byte array containing the payload of the object (typically a serialized object).

  • invokeTimeout: The amount of time allowed for the PostEvent operation to complete on the server.

Sample Event Payload

Consider an application that streams stock prices as events to be stored and analyzed. The following StockQuote class would be defined in a shared project so that both the streaming client and server-side event handling logic can access it. In this example, these StockQuote instances will be used both as an event payload and also in the ScaleOut service as elements in an equity’s price history.

Note

this sample in it’s entirety is available on our GitHub Samples Repo.

package com.scaleout.client.samples.streaming;

import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import java.io.IOException;

public class StockQuote {
    private final  String _ticker;
    private final double _price;
    public final long _volume;
    public final long _timestampMs;

    public StockQuote(String ticker, double price, long volume, long timestampMs) {
        _ticker = ticker;
        _price = price;
        _volume = volume;
        _timestampMs = timestampMs;
    }

    public String getTicker() {
        return _ticker;
    }

    public double getPrice() {
        return _price;
    }

    public long getVolume() {
        return _volume;
    }

    public long getTimestampMs() {
        return _timestampMs;
    }

    public static byte[] toBytes(StockQuote stockQuote) throws IOException {
        MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
        packer.packString(stockQuote.getTicker())
                .packDouble(stockQuote.getPrice())
                .packLong(stockQuote.getVolume())
                .packLong(stockQuote.getTimestampMs())
                .close();
        return packer.toByteArray();
    }

    public static StockQuote fromBytes(byte[] bytes) throws IOException {
        MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
        String ticker = unpacker.unpackString();
        double price = unpacker.unpackDouble();
        long volume = unpacker.unpackLong();
        long timestampMs = unpacker.unpackLong();
        unpacker.close();
        return new StockQuote(ticker, price, volume, timestampMs);
    }
}

Using Post Event

In this sample, the ScaleOut service is storing stock history objects, (a linked list of stock quotes), keyed by ticker symbol. Rather than using the traditional method of moving the entire history object over the network to modify it, we use postEvent to push a single stock quote to the ScaleOut host that holds the stock’s history.

package com.scaleout.client.samples.streaming;

import com.scaleout.client.GridConnectException;
import com.scaleout.client.GridConnection;
import com.scaleout.client.caching.Cache;
import com.scaleout.client.caching.CacheBuilder;
import com.scaleout.client.caching.CacheException;
import com.scaleout.client.caching.InvokeResponse;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.LinkedList;

public class PostEventClientSample {
    public static void main(String[] args) throws GridConnectException, IOException, CacheException {
        GridConnection connection = GridConnection.connect("bootstrapGateways=localhost");
        Cache<String, LinkedList<StockQuote>> cache = new CacheBuilder<String, LinkedList<StockQuote>>(connection, "PostEventSample", String.class)
                .customSerialization(new StockQuoteSerializers.StockQuotesMessagePackSerializer(), new StockQuoteSerializers.StockQuotesMessagePackDeserializer())
                .build();

        // Simulate an incoming stock quote:
        var quote = new StockQuote("MSFT",126.90, 14330868, System.currentTimeMillis());

        // Post the quote to the ScaleOut service holding this stock's history.
        // Here, ticker symbols are the keys to history objects in the cache.
        // Use it as the key for the post operation to cause the event to be
        // raised on the ScaleOut where the MSFT history resides
        InvokeResponse response = cache.postEvent(quote.getTicker(), "stockQuote", StockQuote.toBytes(quote), Duration.ofSeconds(2));
        switch (response.getRequestStatus()) {
            case EventPosted:
                System.out.println("Event was successfully posted.");
                break;
            case UnhandledExceptionInCallback:
                System.out.println("Unhandled exception thrown from Posted Event Handler.");
                System.out.println(new String(response.getErrorData(), StandardCharsets.UTF_8));
                break;
        }
    }
}