Sending Messages
Kafka clients can send messages to the ScaleOut Active Caching service.
Sending Messages
Once the Kafka connection has been created, you can use any supported Kafka tool or application to send a message on the registered topic to a specific MSG module SOSS object. For example, the following example shows how to send a message to a SOSS object, identified as “Instance_23”, through the kafka-console-producer.sh script. :
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic MyMsgModuleTopic --property "parse.key=true" --property "key.separator=|"
Instance_23|{"myMessageType":"initialMessage", "myStringStateChange":"initialMessage", "myIntegerStateChange":10, "timestamp":1561273200000}
Note
The payload needs to be a serialized message that can be deserialized by the module.
Note
The Kafka connection requires messages within Kafka to specify a key. The key will be used as the instance identifier for the corresponding SOSS object.
Sending Messages During Message Processing
The KafkaConnection interface allows MSG module MessageProcessor
implementations to publish messages to an Azure Kafka Event Hub or on-premises Kafka message broker.
Note
The Kafka connection must be running and enabled for messages to be sent to an Azure Kafka or on-premises Kafka broker.
Note
The Kafka topic must exist before sending a message. If the topic does not exist, then the Kafka broker must be configured to automatically create topics when a new message arrives, otherwise the message will be dropped. See: https://kafka.apache.org/documentation/#brokerconfigs.
The following example demonstrates sending a UTF-8 string “Hello, KafkaTopic” message to the Kafka topic “KafkaTopic”:
public override ProcessingResult ProcessMessages(ProcessingContext context, MySossObject dt, byte[] mySerializedMessage)
{
MyMessage msg = //deserialize message
if(msg.SendToTopic())
{
context.KafkaConnection().PublishToTopicAsync("KafkaTopic", "MyKey", Encoding.UTF8.GetBytes("Hello, KafkaTopic!"));
}
return ProcessingResult.DoUpdate;
}
@Override
public ProcessingResult processMessages(ProcessingContext processingContext, MySossObject mySossObject, Iterable<MyMessage> incomingMessages) {
MyMessage msg = //deserialize message
if(msg.sendToTopic()) {
KafkaConnectionFactory.getConnection().publishToTopicAsync("KafkaTopic", "MyKey", "Hello, KafkaTopic".getBytes(StandardCharsets.UTF_8));
}
return ProcessingResult.DoUpdate;
}