Overview

The ScaleOut Digital Twin Builder™ is a software toolkit for creating digital twin models for both real-time analytics and simulation that are deployed either on the ScaleOut Digital Twin Streaming Streaming or on-premises using ScaleOut StreamServer. This toolkit provides:

  • Open source (Apache licensed) API libraries for Java and .NET that define base classes used by application developers to create models.

  • Methods for sending messages to digital twin instances from client applications.

  • Libraries to support development and deployment of models on your local workstation, an on-premises production server, or the ScaleOut Digital Twins™ service.

A digital twin model defines two components: a class definition that specifies the state data for each type of data source (for example, a wind turbine), which might include state variables and optional, time-ordered event collections, and a method for analyzing incoming event messages and updating state data. A simulation digital twin model adds a second method for processing the model at simulation time steps. The execution platform creates an instance of a digital twin model for each unique data source (for example, “wind turbine 17”).

When used for streaming analytics, ScaleOut’s digital twin model gives developers a simple, coherent semantic model for associating dynamic state information for each unique data source with a message-processing method that implements an application-specific, real-time analytics algorithm. This model also is designed to automatically handle low-level details regarding correlating incoming events by data source, accessing, in-memory storage, and orchestrating message processing. This helps make applications easy to write and achieve predictably high performance.

ScaleOut’s digital twin model uses well understood object-oriented techniques to simplify application design. It is well suited for building stream-processing applications and simulations in many fields, including industrial IoT, manufacturing, logistics, financial services, medical monitoring, ecommerce, security, and more. Its revolutionary approach to organizing application code and data for stream processing offers several key advantages over traditional, pipelined software platforms:

  • automatic tracking of dynamic state information associated with each data source, enabling deeper introspection

  • automatic correlation of incoming event messages by data source

  • a convenient, object-oriented basis for encapsulating message-processing algorithms for different types of data sources

  • a predefined data set for creating data-parallel analytics to detect aggregate trends across all data sources (of a given type) in real time

  • performance advantages, including reduced network overhead in accessing state information and automatic throughput scaling by the underlying compute engine

The ScaleOut Digital Twin Builder defines APIs in Java and C# that let application developers easily define a digital twin model for each type of entity (data source or device). Once deployed to the ScaleOut Digital Twins service and connected to data sources, instances of these models can process incoming messages and simulation events with consistently low latency, typically 1-3 milliseconds, and generate messages for their corresponding devices or other digital twin instances, and alerts to external systems. Models can be organized in a hierarchy to partition functions within a complex system, with low-level instances sending messages to higher-level instances within the hierarchy.

This overview describes the software architecture for digital twin models and then provides a tour of the Java and C# APIs in the toolkit along with several examples.

Development Process

The software development process is illustrated in the following diagram. After a new digital twin model is defined by an application, it is then deployed using the ScaleOut Digital Twins UI. (On-premises applications can deploy models using either the UI or API Libraries.) This action causes the model’s class definition and associated method(s) to be shipped to the ScaleOut Digital Twins execution platform in preparation for handling incoming messages. The software development and deployment process is illustrated in the following diagram:

image1

When used for streaming analytics, the ScaleOut Digital Twins execution platform automatically creates a new real-time digital twin instance when a message first arrives from its corresponding data source. Digital twin instances can also be created using a CSV file. Simulations can synamically create new instances during execution. To maximize simplicity, all data sources are identified with a single string identifier, and instances are identified by a combination of an application-defined string model name (for example, “windTurbine” or “generator”) and the device’s string identifier (for example, “wt17”).

Development Using the Workbench Environment

To help you quickly learn how to build digital twin models, you can create and debug models using the open source “workbench” development environment hosted on your workstation. This lets you maintain full control over the environment so that you can catch exceptions, verify state changes as messages stream through, and diagnose coding issues prior to deployment.

The workbench environment lets you test your model locally to exercise its functionality. Your program can run fully featured simulations in addition to streaming messages to digital twin instances. Additionally, your program can receive messages from a digital twin instance and review log or alert messages.

Details of the workbench environment for each language runtime are explained in the section API Libraries below.

Software Architecture of the Digital Twin Model

The ScaleOut Digital Twin Builder software toolkit includes API libraries for Java and C# applications. The core datatypes used are available in open source libraries from the following GitHub repositories:

When you build a digital twin model, you use the types from the API libraries to implement the following components:

  • a base class definition for the model’s state object which describes the state information to be maintained by each instance of the model

  • a base class definition for a message-processing method which encapsulates the application code that processes and analyzes incoming messages using the state object

  • for simulation, a base class definition for a second processing method which encapsulates the application code that processes simulation time steps to describe the evolving state of the digital twin model using the state object

  • a processing context class that supplies context information to the methods and includes utility methods, for example, for sending a message back to a data source or to another instance

Messages are assumed to be a user-defined type, and that all messages must be serialized using JSON. (JSON serialization is also used to store state objects within the ScaleOut service.) Messages can be sent from any data source (or from another instance) to instances implemented in any language. All instances are identified by a combination of a digital twin model name (string) and a unique instance identifier (string). When sending messages from a data source, the technique for specifying these two items varies with the type of connector used. For example, with AWS IoT Core, a topic is created for each combination of model and instance identifier. In Azure IoT Hub, the model name must be stored within the message, and the instance identifier is supplied by the hub when a message is delivered.

To minimize storage requirements and processing latency, a record of incoming messages is not maintained by default within a digital twin instance. Instead, an application optionally can incorporate time-ordered message lists as part of an instance’s dynamic state, and these lists can be managed and analyzed using ScaleOut’s time-windowing libraries. These lists can be used to record all incoming messages and/or significant events and alerts, as dictated by the needs of the application.

The following sections take a closer look at the API components.

State Object

This object holds application-defined, in-memory state information for each instance of a real-time digital twin model. It is used to provide deep context for analyzing incoming messages and hosting simulation state. Because it is stored in memory within the execution platform, it is immediately available to message-processing code for access and updating when a message arrives or to process simulation events.

For each digital twin model, a state object is defined by the application as a subclass of a toolkit-defined base type. In Java, the base type is as follows (reference):

public abstract class DigitalTwinBase {
  protected String Id;
  protected String Model;

  public void init(String id, String model) {
    this.Id = id;
    this.Model = model;
  }
}

In C#, the base type looks like this (reference):

public abstract class DigitalTwinBase
{
  public string Id { get; private set; }
  public string Model { get; private set; }

  public void Init(string id, string model)
  {
    this.Id = id;
    this.Model = model;
  }
}

See also:

Messages

Messages sent to digital twin instances from data sources have an arbitrary, application-defined payload and are assumed to be serialized using JSON. They are addressed to a digital twin model and instance identifier, both of which are specified as strings. Once received by the model’s message-processing method, messages may be deserialized as an instance of a language-defined type. It is the responsibility of the application to avoid de-serialization exceptions.

When used to implement streaming analytics, data sources can send messages to their corresponding real-time digital twin instances, instances can send messages back to their respective data sources, and instances can send messages to other instances within a hierarchy. Client applications also can send messages to instances using the API libraries. Messages can represent events, alerts, or commands. Their interpretation is defined by application-defined message properties and the messaging-processing method.

Message Processor

A message-processing method, also called a message processor, describes the application code used to receive and analyze incoming messages for a digital twin model. In particular, it specifies the set of parameters and return value for the application-defined method.

In Java, the message processor abstract base type is defined as follows (reference):

public abstract class MessageProcessor<T extends DigitalTwin, V>
  extends MessageProcessorBase<T> implements Serializable {

  public abstract ProcessingResult processMessages(ProcessingContext context,
                                                   T stateObject,
                                                   Iterable<V> incomingMessages) throws Exception;

  @Override
  public ProcessingResult processMessages(ProcessingContext context,
                                          T twin,
                                          MessageListFactory factory) throws Exception {
    Iterable<V> incoming = factory.getIncomingMessages();
    this.processMessages(context, twin, incoming);
  }
}

In C#, it is defined as follows (reference):

public abstract class MessageProcessor<TDigitalTwin, TMessage> :
  MessageProcessor where TDigitalTwin: class
{
  public abstract ProcessingResult ProcessMessages(ProcessingContext context,
                                                   TDigitalTwin digitalTwin,
                                                   IEnumerable<TMessage> newMessages);

  internal override ProcessingResult ProcessMessages(ProcessingContext context,
                                                     DigitalTwinBase digitalTwin,
                                                     IMessageListFactory messageListFactory)
  {
    IEnumerable<TMessage> newMessages = messageListFactory.GetIncomingMessageList<TMessage>();
    return ProcessMessages(context, digitalTwin as TDigitalTwin, newMessages);
  }
}

The message processor supplies the following parameters to the application’s code:

  • a processing context, which supplies methods that the application can use to send messages back to its respective data source or to another instance

  • the instance’s state object

  • a list of incoming messages from the data source to be handled by the message processor

This method returns a result of type ProcessingResult (or Boolean in JavaScript) that indicates whether the state object has been modified and should be updated in the execution platform’s in-memory data store.

Simulation Processor

A simulation-processing method, also called a simulation processor, describes the application code used to process a digital twin’s simulation model at each time step in a simulation. In particular, it specifies the set of parameters and return value for the application-defined method.

In Java, the simulation processor abstract base type is defined as follows (reference):

public abstract class SimulationProcessor<T extends DigitalTwinBase> implements Serializable {

        public abstract ProcessingResult processModel(ProcessingContext context, T instance, Date epoch);
}

In C#, the simulation processor abstract base type is defined as follows (reference):

public abstract class SimulationProcessor<TDigitalTwin> : SimulationProcessor where TDigitalTwin: class
{
    public abstract ProcessingResult ProcessModel(ProcessingContext context,
                                                          TDigitalTwin digitalTwin,
                                                                                                      DateTimeOffset currentTime);

    internal override ProcessingResult ProcessModel(ProcessingContext context,
                                                            DigitalTwinBase digitalTwin,
                                                                                                            DateTimeOffset currentTime)
    {
        return ProcessModel(context, digitalTwin as TDigitalTwin, currentTime);
    }

    internal override TimeSpan SimulationInterval { get; }
}

The simulation processor supplies the following parameters to the application’s code:

  • a processing context, which supplies methods that the application can use to send messages to another instance, create/remove simulation instances, and delay simulation processing

  • the instance’s state object

  • the current simulation time

This method returns a result of type ProcessingResult (or Boolean in JavaScript) that indicates whether the state object has been modified and should be updated in the execution platform’s in-memory data store.

Processing Context

An instance of a processing context is passed to the application’s message processor to provide context-specific features, in particular, methods for sending messages back to the instance’s data source or to other real-time digital twin instances. The processing context also defines a method that can be used by applications to log messages in the cloud service’s UI. These messages are separately displayed by the UI for each model. The processing context provides a mechanism to send alerts to a 3rd party monitoring service such as Splunk On-Call, Slack or PagerDuty. Finally, the processing context offers two methods to control the lifetime of timers (start or stop existing timers).

The Java version is as follows (reference):

public abstract class ProcessingContext implements Serializable
{
  public abstract String getDataSourceId();
  public abstract String getDigitalTwinModel();

  public abstract SendingResult sendToDataSource(byte[] payload);
  public abstract SendingResult sendToDigitalTwin(String model, String id, byte[] payload);
  public abstract void logMessage(Level severity, String message);
  public abstract SendingResult sendAlert(String providerName, AlertMessage alertMessage);
  public abstract TimerActionResult StartTimer(String timerName, TimeSpan interval, TimerType type, TimerHandler timerCallback);
  public abstract TimerActionResult StopTimer(String timerName);
}

In C#, the processing context is defined as follows (reference):

public abstract class ProcessingContext
{
  public abstract string DataSourceId { get; }
  public abstract string DigitalTwinModel { get; }

  public abstract SendingResult SendToDataSource(byte[] message);
  public abstract SendingResult SendToTwin(string targetTwinModel, string targetTwinId, byte[] message);
  public abstract void LogMessage(LogSeverity severity, string message);
  public abstract SendingResult SendAlert(string providerName, AlertMessage alertMessage);
  public abstract TimerActionResult StartTimer(string timerName, TimeSpan interval, TimerType type, TimerHandler timerCallback);
  public abstract TimerActionResult StopTimer(string timerName);
}

Application Endpoint

When used for streaming analytics, client applications can serve as data sources for real-time digital twins and send messages to instances running within the execution platform. The ScaleOut Digital Twin Builder toolkit includes APIs in Java, C#, JavaScript, and REST for sending messages.

A Java application can send a message to a real-time digital twin instance with the following static method defined in the AppEndpoint class, which is located in the digitaltwin-datasource library:

public static SendingResult send(String model, String id, byte[] jsonMessage)

The model parameter specifies the name of the real-time digital twin model, and the id parameter specifies an identifier for the instance of the model, i.e., a specific data source. All messages are assumed to be serialized in JSON.

Likewise, a C# application can send a message to a real-time digital twin instance as follows:

public static SendingResult Send(string model, string id, byte[] jsonMessage)

A JavaScript client can send a message using a function defined in the ScaleOut client library:

const dtclient = require('@scaleoutsoftware/digital_twin_client');
dtclient.sendToTwin('<model>', '<instance id>', JSON.stringify(<msg>));

An application can also post a message to a digital twin using a REST service. Here’s a simple JavaScript sample that accesses the REST service:

/// Send a message to a specified digital twin instance:
/// The message must be in JSON format -- e.g., JSON.stringify(message)
function sendMessage(twinModel, twinId, message) {
  let xhttp = new XMLHttpRequest();
  xhttp.open("POST",`${serverUrl}/api/messages/${twinModel}/${twinId}`, true);
  xhttp.setRequestHeader("Content-type", "application/json");
  xhttp.send(message);
}

Timers

A digital twin instance executes code via the MessageProcessor whenever it receives a message. In addition, timers let you run code or logic in your digital twins at specified time intervals. For example, you can use a timer to determine if a device has failed to send a message within an allowed interval and thereby detect that it might have failed or is operating erratically.

Whether you are creating a code-based model (.NET or Java) or a rules-based model, you can easily define these time-based blocks of logic.

Defining a timer

A timer can be defined as one of two types: one-time or recurring.

A one-time timer is fired once after the specified time interval is elapsed. A recurring timer will be fired at the specified time interval until it is explicitly stopped.

To define a timer, you need to provide:

  • A name for your timer

  • The time interval

  • The type of your timer (one-time or recurring)

  • The code or logic to execute when the timer elapses

Note

The maximum number of timers you can define for each model is set to five.

To define the code to execute in .NET and Java models, you define a timer handler or callback method to be invoked when the timer elapses. In rules-based models, the logic is defined as a set of rules to execute when this timer expires.

More details are available in the sections below.

Starting or stopping a timer

A timer does not start or stop automatically. You can use the ProcessingContext StartTimer and StopTimer functions to control a timer’s behavior. Call these functions with the name of the timer you want to start or stop. In rules engine models, use the START_TIMER and STOP_TIMER functions.

Timers in .NET

Defined as a public static delegate method, the handler will be passed as a parameter to ProcessingContext StartTimer and StopTimer functions when you decide to start a given timer.

The TimerHandler delegate method has the following signature:

public delegate ProcessingResult TimerHandler(string timerName, DigitalTwinBase instance, ProcessingContext context);

When the timer elapses, the ScaleOut Digital Twins service will call the handler method and provide three parameters:

  1. timerName: the handler will be provided with the timer name being invoked.

  2. instance: the digital twin instance that this timer is relevant to.

  3. context: the ProcessingContext object, giving you access to all the public ProcessingContext API (e.g. SendAlert, LogMessage, etc.)

Note

The return value of your timer handler should reflect whether the instance was modified as part of the handler code. Return ProcessingResult.DoUpdate when digital twin object data has been modified so that the instance is saved. Return ProcessingResult.NoUpdate if the handler did not modify the twin object and the instance does not need to be saved.

The sample code below defines a timer handler in C#:

 1public class SampleModel : DigitalTwinBase
 2{
 3    // [...]
 4
 5    public static ProcessingResult CheckForUpdates(string timerName, DigitalTwinBase instance, ProcessingContext context)
 6    {
 7        // You have access to the SampleModel instance, passed as a DigitalTwinBase
 8        SampleModel dt = instance as SampleModel;
 9        if (dt != null)
10        {
11            // Here, define the logic of your handler. You can view or modify the instance's properties
12            // or even use the ProcessingContext to send messages to other instances, send alerts, etc.
13        }
14
15        // In this case, this handler only reads properties of the instances but does not modify the data
16        return ProcessingResult.NoUpdate;
17    }
18}

Once this handler is defined, use the ProcessingContext StartTimer to start the execution of the timer:

 1public class SampleMessageProcessor : MessageProcessor<SampleModel,SampleMessage>
 2{
 3    // [...]
 4
 5    public override ProcessingResult ProcessMessages(
 6        ProcessingContext context,
 7        SampleModel digitalTwin,
 8        IEnumerable<SampleMessage> newMessages)
 9    {
10        // [...]
11
12        // Define a condition to start your timer
13        if (digitalTwin.Temperature > MaxAcceptableValue)
14        {
15            var result = context.StartTimer(
16                "CheckValuesForHighTemperatures",
17                TimeSpan.FromSeconds(15),
18                TimerType.Recurring,
19                SampleModel.CheckForUpdates);
20
21            if (result != TimerActionResult.Success)
22            {
23                context.LogMessage(LogSeverity.Error, "Failed to start CheckValuesForHighTemperatures timer for digital twin.");
24            }
25
26            // [...]
27        }
28
29        // [...]
30
31        // You can also define a condition to stop it
32        if (digitalTwin.Temperature < AnotherThreshold)
33        {
34            var timerStopped = context.StopTimer("CheckValuesForHighTemperatures");
35        }
36
37        return ProcessingResult.DoUpdate;
38    }
39}

Timers in Java

Defined as an override of the onTimedMessage method, the handler will be passed as a parameter to ProcessingContext StartTimer and StopTimer functions when you decide to start a given timer.

The TimerHandler would be implemented as follows:

public class TimerModelTimerHandler implements TimerHandler<SampleModel> {
    public TimerModelTimerHandler() {}
    @Override
    public ProcessingResult onTimedMessage(String timerName, SampleModel twinInstance, ProcessingContext context) {

        // Define the logic of your handler. You can view or modify the instance's properties
        // or even use the ProcessingContext to send messages to other instances, send alerts, etc.

        // <logic here>

        // In this case, this handler modifies properties of the instances
        return ProcessingResult.UpdateDigitalTwin;
    }
}

When the timer elapses, the ScaleOut Digital Twins service will call the onTimedMessage method and provide three parameters:

  1. timerName: the handler will be provided with the timer name being invoked.

  2. instance: the digital twin instance that this timer is relevant to.

  3. context: the ProcessingContext object, giving you access to all the public ProcessingContext API (e.g. SendAlert, LogMessage, etc.)

Note

The return value of your timer handler should reflect whether the instance was modified as part of the handler code. Return ProcessingResult.DoUpdate when digital twin object data has been modified so that the instance is saved. Return ProcessingResult.NoUpdate if the handler did not modify the twin object and the instance does not need to be saved.

Once this handler is defined, use the ProcessingContext StartTimer to start the execution of the timer:

 1// [...]
 2
 3// Define a condition to start your timer
 4if (digitalTwin.Temperature > MaxAcceptableValue)
 5{
 6    var result = context.startTimer("CheckValuesForHighTemperatures", Duration.ofSeconds(15), TimerType.Recurring, new TimerModelTimerHandler());
 7    if (result != TimerActionResult.Success)
 8    {
 9        context.logMessage(Level.INFO, $"Failed to start CheckValuesForHighTemperatures timer for digital twin {digitalTwin.Id} of model {digitalTwin.Model}: {result}.");
10    }
11
12    // [...]
13}
14
15// [...]
16
17// You can also define a condition to stop it
18if (digitalTwin.Temperature < AnotherThreshold)
19{
20    var timerStopped = context.StopTimer("CheckValuesForHighTemperatures");
21}

Timers in rules-based models

No code is involved for timers in rules-based models. A timer is defined by a set of rules to execute when the timer elapses. Please refer to the Timers section in the description of rules-based models to see how to add timers using the ScaleOut Model Development Tool.

Sample Programs

The following sample programs illustrate the use of these concepts in building a simple real-time digital twin model to process incoming messages and implement streaming analytics. These samples illustrate how instances can maintain dynamic state information that is unique to their respective data sources. This state information enables deeper introspection on the contents of incoming messages and better feedback and alerting than would otherwise be possible.

Heart-Rate Tracking Sample (Java)

This sample tracks heart-rate telemetry from a wearable device called a heart-rate tracker and detects incidents of heart-rate spikes that might indicate a risk to the user. The device sends periodic messages reporting heart-rate with a timestamp to its real-time digital twin instance. The instance’s state object includes user-specific parameters, such as age, body mass index, and a maximum allowed heart-rate that is not considered a spike. It also includes dynamic state variables that track the duration of an incident in which heart-rate spikes are occurring. The message processor code looks for heart-rate spikes reported by incoming messages and then tracks the duration of each incident in which they are occurring. It also alerts the device when the duration exceeds allowable limits based on the user’s parameters.

A message from the device is defined as follows:

public class HeartRate {
  private int _hr; // heart-rate
  private long _ts; // timestamp

  public HeartRate(int hr, long timestamp) {
    _hr = hr;
    _ts = timestamp;
  }

  public long getTimestamp() {
    return _ts;
  }

  public int getHeartrate() {
    return _hr;
  }

  public byte[] toJson() {return /* <serialize to JSON> */}
}

An alert message back to the device can be defined as follows:

public class HeartRateAlert {
  private String _msg; // alert message
  private long _ts; // timestamp

  public HeartRateAlert(String msg, long timestamp) {
    _msg = msg;
    _ts = timestamp;
  }

  public byte[] toJson() {return /* <serialize to JSON> */}
}

A very simple real-time digital twin class definition for the heart-rate tracker which includes user parameters and dynamic state information can be defined as follows. Some of the details regarding helper classes are omitted.

public class HeartRateTracker extends DigitalTwinBase {
  // user's parameters:
  private AgeRange userAgeRange;
  private BodyMassIndex userBmi;
  private Medication userMedication;

  // dynamic state for tracking heart-rate spikes:
  private int heartRateSpikeCount;
  private long maxHeartRate;
  private boolean heartRateSpikeInProgress;
  private long heartRateSpikeStartTime;
  private List<Alert> alertList;

  public HeartRateTracker() {
    // handle tracker initialization
  }

  public AgeRange getUserAgeRange() {
    return userAgeRange;
  }

  public BodyMassIndex getUserBmi() {
    return userBmi;
  }

  public Medication getMedication() {
    return userMedication;
  }

  public int getHeartRateSpikeCount() {
    return heartRateSpikeCount;
  }

  public long getMaxHeartRate() {
    return maxHeartRate;
  }

  public boolean isSpikeInProgress() {
    return heartRateSpikeInProgress;
  }

  public long getSpikeStartTime() {
    return heartRateSpikeStartTime;
  }

  public void stopSpikeTracking() {
    heartRateSpikeCount = 0;
    heartRateSpikeInProgress = false;
  }

  public void startSpikeTracking(long timestamp) {
    heartRateSpikeCount++;
    heartRateSpikeInProgress = true;
    heartRateSpikeStartTime = timestamp;
  }

  public void recordAlert(Alert alert) {
    // use the TimeWindowing library to add to list in time-order:
    Utils.addTimeOrdered(alerts, HeartRateAlert::getTimestamp, alert);
  }
}

This digital twin’s message processor is as follows:

public class HeartRateMessageProcessor extends MessageProcessor<HeartRateTracker, HeartRate> {

  @Override
  public ProcessingResult processMessages(ProcessingContext processingContext,
                                          HeartRateTracker tracker,
                                          Iterable<HeartRate> incomingMessages) throws Exception {
    ProcessingResult result = NoUpdate;
    long unixTimeNow = System.currentTimeMillis();

    // analyze incoming messages:
    for(HeartRate msg : incomingMessages)
    {
      // if the message indicates a heart-rate spike, track it:
      if(incomingMessage.getHeartrate() > tracker.getMaxHeartRate())
      {
        // if this is the first spike, start tracking an incident:
        if (!(tracker.isSpikeInProgress()))
        {
          tracker.startSpikeTracking(msg.getTimestamp());
          result = UpdateDigitalTwin;
        }

        // alert user if spike reached duration that exceeds
        // limit allowed by user's age, bmi, or medication:
        long duration = unixTimeNow - tracker.getSpikeStartTime();

        if (duration > tracker.getUserAgeRange().getThreshold() ||
            duration > tracker.getUserBmi().getThreshold() ||
            duration > tracker.getMedication().getThreshold())
        {
          // send alert message to data source:
          HeartRateAlert alert = new HeartRateAlert(String.format(
              "Heart-rate exceeded (%d, %d) - current hr %d for %d ms.",
              tracker.getUserAgeRange().getThreshold(),
              tracker.getUserBmi().getThreshold(),
              msg.getHeartrate(),
              duration), unixTimeNow);

          processingContext.sendToDataSource(alert.toJson());

          // record the alert for time-window analysis:
          tracker.recordAlert(alert);

          result = UpdateDigitalTwin;
        }
      }

      // otherwise, if the incident has ended, reset the tracking state:
      else if (tracker.isHeartRateSpikeInProgress())
      {
        tracker.stopSpikeTracking();
        result = UpdateDigitalTwin;
      }
    }
    return result;
  }
}

For test purposes, a Java client program can send a message to instance “tracker_1717” of a HeartRateTracker real-time digital twin model as follows:

byte[] msg = new HeartRate(130, System.currentTimeMillis()).toJson();

SendingResult result = AppEndpoint.send("HeartRateTracker", "tracker_1717", msg);

switch (result) {
  case Handled:
    System.out.println("Message was delivered and handled.");
    break;
  case NotHandled:
    System.out.println("Message was not handled.");
    break;
}

Wind Turbine Tracking Sample (C#)

This sample demonstrates how telemetry from a wind turbine can be tracked and analyzed. As with the previous sample, it illustrates how a real-time digital twin instance can track both parameters and dynamic state variables for each data source and use this information to introspect on the significance of incoming messages from the data source.

In this sample, messages from a wind turbine report the turbine’s temperature with a timestamp. If a high temperature (“overtemp”) condition is detected, its duration is tracked. If the wind turbine is in a pre-maintenance period, an alert is sent to the data source after a shorter duration than is permitted under normal conditions. The length of the pre-maintenance period is determined by the wind turbine’s model. A list of incidents is maintained for future analysis. This list records when the turbine enters an overtemp condition, is alerted, and when the overtemp condition is resolved.

Telemetry messages from the wind turbine are defined as follows:

public class DeviceTelemetry
{
  public int Temp { get; set; }
  public DateTime Timestamp { get; set; }
}

Here are some other helper data structures used in the state object and message processor:

public enum IncidentType
{
  OverTempDetected,
  OverTempAlert,
  OverTempResolved
}

public enum WindTurbineModel
{
  Model17,
  Model18,
  Model19
}

public class Incident
{
  public Description IncidentType { get; set; }
  public DateTime TimeStamp { get; set; }
  public int MetricValue { get; set; }
  public bool InPreMaintPeriod { get; set; }
}

public class Alert
{
  public Description IncidentType { get; set; }
  public DateTime TimeStamp { get; set; }
  public TimeSpan Duration { get; set; }
  public bool InPreMaintPeriod { get; set; }
}

The wind turbine’s real-time digital twin class definition contains the following parameters and state variables:

public class WindTurbine : DigitalTwinBase
{
  // physical characteristics:
  public const string DigitalTwinModelType = "windturbine";
  public WindTurbineModel TurbineModel { get; set; } = WindTurbineModel.Model7331;
  public DateTime NextMaintDate { get; set; } = new DateTime().AddMonths(36);
  public const int MaxAllowedTemp = 100; // in Celsius
  public TimeSpan MaxTimeOverTempAllowed = TimeSpan.FromMinutes(10);
  public TimeSpan MaxTimeOverTempAllowedPreMaint = TimeSpan.FromMinutes(2);

  // dynamic state variables:
  public bool TrackingOverTemp { get; set; }
  public DateTime OverTempStartTime { get; set; }
  public int NumberMsgsWithOverTemp {get; set;}

  // list of incidents and alerts:
  public List<Incident> IncidentList { get; } = new List<Incident>();
}

The message processor code for this real-time digital twin model is defined as follows.

public class WindTurbineMessageProcessor : MessageProcessor<WindTurbine, DeviceTelemetry>
{
  static Dictionary<WindTurbineModel, TimeSpan> _preMaintPeriod;
  public override ProcessingResult ProcessMessages(ProcessingContext context,
                                                  WindTurbine dt,
                                                  IEnumerable<DeviceTelemetry> newMessages)
  {
    var result = ProcessingResult.NoUpdate;

    // determine if we are in the pre-maintenance period for this
    // wind turbine model:
    var preMaintTimePeriod = _preMaintPeriod[dt.TurbineModel];
    bool isInPreMaintPeriod = ((dt.NextMaintDate - DateTime.UtcNow) < preMaintTimePeriod) ? true : false;

    // process incoming messages:
    foreach (var msg in newMessages)
    {
      // if message reports a high temp indication, track it:
      if (msg.Temp > WindTurbine.MaxAllowedTemp)
      {
        dt.NumberMsgsWithOverTemp++;
        if (!dt.TrackingOverTemp)
        {
          dt.TrackingOverTemp = true;
          dt.OverTempStartTime = DateTime.UtcNow;

          // add a notification to the incident list:
          dt.IncidentList.Add(new Incident() {
            Description = IncidentType.OverTempDetected,
            TimeStamp = dt.OverTempStartTime,
            MetricValue = msg.Temp,
            InPreMaintPeriod = isInPreMaintPeriod });
        }
      }

      TimeSpan duration = DateTime.UtcNow - dt.OverTempStartTime;

      // If we have exceeded the max allowed time for an over-
      // temperature condition in either a normal or pre-maint.
      // condition, send an alert:
      if (duration > dt.MaxTimeOverTempAllowed ||
            (isInPreMaintPeriod && duration > dt.MaxTimeOverTempAllowedPreMaint))
      {
        var alert = new Alert();

        alert.Description = IncidentType.Overtemp_alert;
        alert.TimeStamp = DateTime.UtcNow;
        alert.Duration = duration;
        alert.IsInPreMaintPeriod = isInPreMaintPeriod;

        // send message back to data source:
        context.SendToDataSource(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(alert)));

        // add a notification to the incident list:
        dt.IncidentList.Add(new Incident() {
          Description = IncidentType.Overtemp_alert,
          TimeRegistered = DateTime.UtcNow,
          MetricValue = dt.NumberMsgsWithOverTemp,
          InPreMaintPeriod = isInPreMaintPeriod });
      }

      result = ProcessingResult.DoUpdate;
    }

    // stop tracking the condition and reset our state:
    else if (dt.TrackingOverTemp)
    {
      dt.TrackingOverTemp = false;
      dt.NumberMsgsWithOverTemp = 0;

      // Add a notification to the incident list:
      dt.IncidentList.Add(new Incident() {
        Description = IncidentType.OverTempResolved,
        TimeStamp = DateTime.UtcNow,
        MetricValue = msg.Temp,
        InPreMaintPeriod = isInPreMaintPeriod });

      result = ProcessingResult.DoUpdate;
    }
    return result;
  }
}

A C# client application can then send a message to wind turbine instance “WT17” as follows:

SendingResult result = AppEndpoint.Send("WindTurbine", "WT17", jsonMessage);
if (result == SendingResult.Handled)
  Console.WriteLine("Message was delivered and processed successfully");
else
  Console.WriteLine("Failed to process message");

Next Steps

The ScaleOut Digital Twin Builder toolkit provides powerful, easy to use APIs for building digital twin models, deploying them for execution on-premises, and using them to provide deep introspection on the dynamic behavior of data sources or to build simulations. This toolkit was designed to dramatically simplify the use of digital twins and enable applications to easily implement powerful streaming analytics and large-scale simulations.

Here are the steps you should follow to get started using ScaleOut Digital Twin Builder:

  • If you are developing a Java application, you can find an open source, Apache-licensed library for building digital twin models on GitHub here.

  • If you are developing a C# application, you can install and use the Scaleout.DigitalTwin.Templates package to create a new .NET Core digital twin model project. For more information, click here.

  • If you are developing a JavaScript application, please download a JavaScript package from npmjs.org here.

We invite your questions and feedback! Please contact us at support@scaleoutsoftware.com.