Define the Message Processor

The MessageProcessor processes incoming messages from each data source for the corresponding instance of a real-time digital twin model. A message processor extends the MessageProcessor abstract class. When a message arrives from a data source, the ScaleOut Digital Twins™ service will call the processMessages method and supply the following parameters: a reference to the instance’s state object for this data source, a list of incoming messages, and a reference to the instance’s ProcessingContext, which provides helper methods and properties.

The following Thermostat message processor handles temperature change messages. If the current temperature exceeds the max temperature, the twin sends a message to the datasource of the current temperature of the thermostat.

// RealTimeThermostatMessageProcessor.java

public class RealTimeThermostatMessageProcessor extends MessageProcessor<RealTimeThermostat, TemperatureChangeMessage> {
        static final int HIGH_TEMPERATURE = 80;
        @Override
        public ProcessingResult processMessages(ProcessingContext processingContext, RealTimeThermostat thermostat, Iterable<TemperatureChangeMessage> messages) throws Exception {
                // apply the updates from the messages
                for(TemperatureChangeMessage message : messages) {
                        thermostat.incrementTemperature(message.getTemperatureChange());
                }
                if(thermostat.getTemperature() > HIGH_TEMPERATURE) {
                        processingContext.sendToDataSource(new TemperatureChangeMessage(thermostat.getTemperature()));
                }
                return ProcessingResult.UpdateDigitalTwin;
        }
}

While a digital twin is processing incoming messages, it can send messages either back to the data source (e.g., an IoT device) or to another digital twin instance. This is accomplished with asynchronous methods defined in the ProcessingContext class. These methods immediately return after a message is successfully enqueued for delivery.

Note

For full ProcessingContext API documentation, please see the JavaDocs API Reference.

Note

Because more time is required to deliver messages sent from a real-time digital twin than it takes to enqueue them for sending, the system’s internal message queue can reach its capacity. If this happens or if an error occurs during message delivery, both methods return an error status.