Creating a Message Module Project

Use the scaleout-msg-module archetype to create a API Module project.

Background

In this example, we will use the com.scaleoutsoftware.archetypes:scaleout-msg-module archetype to create and customize a MSG module for a flight object. The Message processor in a message module is responsible for persisting, analyzing, and reacting to incoming messages.

Prerequisites

  • Java 8 or higher.

  • Maven

Procedure

1. Create API module project

To create a message module Maven project, run the following command in your terminal:

mvn archetype:generate \
  -DarchetypeGroupId=com.scaleoutsoftware.archetypes \
  -DarchetypeArtifactId=scaleout-msg-module \
  -DarchetypeVersion=1.0.0 \
  -DgroupId=com.mycompany.flighttracking \
  -DartifactId=FlightTracker \
  -DsossObjectName=Flight \
  -DinteractiveMode=false

What These Options Do

Property

Description

groupId

Your organization or namespace

artifactId

Name of the generated project directory + artifact id

sossObjectNameName

The class representing each Flight

This command generates a new folder (FlightTracker) containing:

  • Java class stubs for your message module

  • A sample working unit test demonstrating sending a message

  • A pre-configured scaleoutPackage.json metadata file

  • A Maven project file (pom.xml) with preconfigured dependencies

  • A build pipeline for packaging your project for deployment

2. Customize state object

Customize the SOSS object used to store data for your module. A skeleton class named “Flight” was created by the archetype and must be modified to hold state for the module (in this case, a list of items in the shopping cart). Instances of this object will be stored in the ScaleOut StateServer cluster.

package com.mycompany.flighttracking;

import java.util.Iterator;
import java.util.List;

public class Flight {
    long arrivalTimeMs;
    List<Passenger> passengerList;
    // For JSON serialized objects, an Id field is required to enable 
    // queries from the ActiveCaching UI. 
    String Id;

    public Flight(String flightId) {
        Id = flightId;
    }

    public void updateArrivalTime(long newArrivalTimeMs) {
        arrivalTimeMs = newArrivalTimeMs;
    }

    public long getArrivalTimeMs() {
        return arrivalTimeMs;
    }
}
  • SOSS objects should also contain a string property named Id. While not strictly required, the Id property is used by the ScaleOut Active Caching UI to identify objects in the cluster for queries.

3. Implement the CreateObject() method

Implement the CreateObject method in your MessageProcessor. A skeleton class named “FlightMessageProcessor” was precreated by the archetype and must be modified with a concrete CreateObject() implementation. This method is called when an API invocation is received and the associated SOSS object does not yet exist.

public CreateResult<Flight> createObject(String moduleName, String id) {
    return new CreateResult<Flight>() {
        @Override
        public Flight getValue() {
            return new Flight(id);
        }
    };
}
  • The CreateResult instance returned by the createObject() method will have the following three properties set:

    • Value: The newly created SOSS object (in this case, a shopping cart whose Items property is an empty list).

    • Expiration: A Duration indicating how long the object should be kept in the cluster before being automatically removed. By default, Duration.ofSeconds(0) – infinite timeout.

    • ExpirationType: A enum value indicating the expiration behavior for the object. Set to Sliding to reset the expiration every time the SossObject is accessed, or Absolute to keep it for a fixed duration. By default, ExpirationType.Sliding.

4. Implement the message module’s processMessage method

The message module’s processMessage method is implemented in the FlightMessageProcessor class to handle incoming messages.

The processMessage method accepts an arbitrary message as a byte array.

public ProcessingResult processMessage(MsgProcessingContext<Flight> MsgProcessingContext, Flight flightSossObject, byte[] message) {
    try {
        ArrivalTimeMessage msg = ArrivalTimeMessage.deserialize(message);
        flightSossObject.updateArrivalTime(msg.getArrivalTimeMs());
    } catch (Exception e) {
        // Catch all exceptions and send an alert using the UI alerter.
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        e.printStackTrace(pw);
        pw.flush();
        sw.flush();
        MsgProcessingContext.sendUiAlert(AlertSeverity.Error, "Exception thrown by id" + MsgProcessingContext.getObjectId() + " " + pw.toString());
    }
    // Return ProcessingResult.DoUpdate if this method modified the SOSS object.
    // If no changes occurred or the changes are to be discarded, return ProcessingResult.NoUpdate.
    // To remove the SOSS object, return ProcessingResult.Remove;
    return ProcessingResult.DoUpdate;
}
  • The processMessage method must return ProcessingResult enum. The following methods should be implemented to return concrete values from the invocation:

    • DoUpdate: Indicates that the object was (or may have been) modified and must be updated in the ScaleOut service.

    • NoUpdate: Indicates the object was not modified and does not need to be updated in the ScaleOut service. (If you are unsure of whether the object was modified, always return DoUpdate.)

    • Remove: Remove the associated SossObject from the ScaleOut cluster.

5. Send messages

Clients can send messages to the message module through the MsgSender, MsgModuleClient Java APIs, connections, or REST calls.

The MsgSender is used to send serialized messages to a message module. In addition to being able to send serialized messages to a message module, the MsgModuleClient is provides access to the Cache holding the message module SOSS objects.

The projects created unit test demonstrates running the ModulePackage locally and sending a message to the message module using a development MsgSender client.

// instantiate the module package
ModulePackage modulePackage = new ModulePackage();
// define the MsgModuleOptions
MsgModuleOptions<Flight> msgModuleOptions = new MsgModuleOptionsBuilder<Flight>(Flight.class).build();
// add the MSG module to the package
modulePackage.addMsgModule("Flight", new FlightMessageProcessor(), msgModuleOptions);
// run a local development package
modulePackage.runLocalDevelopmentEnvironment();

// instantiate a development MsgModuleClient
MsgModuleClient<Flight> client = new MsgModuleClientBuilder<Flight>("Flight", Flight.class).buildLocalDevelopmentClient();

// send a message with an expected value of 5
long expected = System.currentTimeMillis();
String flightId = "Flight"+ UUID.randomUUID().toString();
client.sendToModule(flightId, ArrivalTimeMessage.serialize(new ArrivalTimeMessage(expected))).get();

Cache<String,Flight> cache = client.getCacheForModule();
CacheResponse<String,Flight> response = cache.read(flightId);
Assert.assertSame(RequestStatus.ObjectRetrieved, response.getStatus());
Assert.assertNotNull(response.getValue());
Assert.assertEquals(expected, response.getValue().getArrivalTimeMs());
  • To send a message to a module, use the com.scaleoutsoftware.modules:client library and build a MsgSender or MsgModuleClient.

  • Call sendToModule(), supplying the target SOSS object’s Id and the message as a byte[].

6. State Object Serialization

In Java, by default the SOSS state object is serialized using JSON serialization with the GSON serialization library.

To use a custom serializer, create an implementation of the CacheSerializer and CacheDeserializer. The following code snippets demonstrate using the Kryo serialization protocol for the Flight state object.

public class KryoSerializer extends CacheSerializer<Flight> {
    private final Kryo kryo;
    public KryoSerializer() {
        kryo = new Kryo();
        kryo.register(Flight.class);
    }

    @Override
    public byte[] serialize(Flight flight) throws SerializationException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (Output output = new Output(bos)) {
            kryo.writeObject(output, flight);
        }
        return bos.toByteArray();
    }
}
public class KryoDeserializer extends CacheDeserializer<ShoppingCart> {
    private final Kryo kryo;
    public KryoDeserializer() {
        kryo = new Kryo();
        kryo.register(Flight.class);
    }

    @Override
    public ShoppingCart deserialize(byte[] bytes) throws DeserializationException {
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        try (Input input = new Input(bis)) {
            return kryo.readObject(input, ShoppingCart.class);
        }
    }

Configure the ModuleOptions to use the newly defined custom serializer classes by instantiating the KryoSerializer and KryoDeserializer and setting them on the ModuleOptionsBuilder. Then, associate the ModuleOptions with the ModulePackage when adding the Flight module.

MsgModuleOptions<Flight> msgModuleOptions = new MsgModuleOptionsBuilder<Flight>(Flight.class)
        .setSerialization(new KryoSerializer(), new KryoDeserializer(), false)
        .build();
// add the MSG module to the package
modulePackage.addMsgModule("Flight", new FlightMessageProcessor(), msgModuleOptions);