Sending and Recieving Messages
Kafka clients can send and recieve messages to and from the ScaleOut Digital Twins service or on-premises deployment.
Sending Messages
Once the Kafka connector has been created, you can use any supported Kafka tool or application to send a message on the registered topic to a specific real-time digital twin instance. For example, the following example shows how to send a message to a digital twin instance, “Instance_23”, through the kafka-console-producer.sh script. :
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic MyDigitalTwinTopic --property "parse.key=true" --property "key.separator=|"
Instance_23|{"myMessageType":"initialMessage", "myStringStateChange":"initialMessage", "myIntegerStateChange":10, "timestamp":1561273200000}
Note
The payload needs to deserialize to the message type defined by the specified real-time digital twin model.
Note
The Kafka connector requires messages within Kafka to specify a key. The key will be used as the instance identifier for the corresponding real-time digital twin instance.
Recieving Messages
Once the Kafka connector has been created, you can use any supported Kafka tool or application to recieve messages on the topic. For example, the following example shows how to recieve messages from the “MyResponseTopic”, with the kafka-console-consumer.sh script.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyResponseTopic --from-beginning
Sending Messages During Message Processing
The DigitalTwinKafkaConnection interface allows message processing implementations to publish messages to an Azure Kafka Event Hub or on-premises Kafka message broker.
Note
The Kafka connector must be running and enabled for messages to be sent to an Azure Kafka or on-premises Kafka broker.
Note
The Kafka topic must exist before sending a message. If the topic does not exist, then the Kafka broker must be configured to automatically create topics when a new message arrives otherwise the message will be dropped. See: https://kafka.apache.org/documentation/#brokerconfigs.
Example:
public override ProcessingResult ProcessMessages(ProcessingContext context, WindTurbineDigitalTwin dt, IEnumerable<DeviceTelemetry> newMessages)
{
foreach (var msg in newMessages)
if(msg.SendToTopic())
{
context.DigitalTwinKafkaConnection().PublishToTopicAsync("KafkaTopic", "MyKey", Encoding.UTF8.GetBytes("Hello, KafkaTopic!"));
}
}
return ProcessingResult.DoUpdate;
}
@Override
public ProcessingResult processMessages(ProcessingContext processingContext, MyDigitalTwin kafkaTestDigitalTwin, Iterable<MyMessage> incomingMessages) {
for(KafkaTestMessage msg : incomingMessages) {
if(msg.sendToTopic()) {
DigitalTwinKafkaConnectionFactory.getConnection().publishToTopicAsync("KafkaTopic", "MyKey", "Hello, KafkaTopic".getBytes(StandardCharsets.UTF_8));
}
}
return ProcessingResult.UpdateDigitalTwin;
}