Sending Messages

Kafka clients can send messages to the ScaleOut Active Caching service.

Sending Messages

Once the Kafka connection has been created, you can use any supported Kafka tool or application to send a message on the registered topic to a specific MSG module SOSS object. For example, the following example shows how to send a message to a SOSS object, identified as “Instance_23”, through the kafka-console-producer.sh script. :

    $ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic MyMsgModuleTopic --property "parse.key=true" --property "key.separator=|"
Instance_23|{"myMessageType":"initialMessage", "myStringStateChange":"initialMessage", "myIntegerStateChange":10, "timestamp":1561273200000}

Note

The payload needs to be a serialized message that can be deserialized by the module.

Note

The Kafka connection requires messages within Kafka to specify a key. The key will be used as the instance identifier for the corresponding SOSS object.

Sending Messages During Message Processing

The KafkaConnection interface allows MSG module MessageProcessor implementations to publish messages to an Azure Kafka Event Hub or on-premises Kafka message broker.

Note

The Kafka connection must be running and enabled for messages to be sent to an Azure Kafka or on-premises Kafka broker.

Note

The Kafka topic must exist before sending a message. If the topic does not exist, then the Kafka broker must be configured to automatically create topics when a new message arrives, otherwise the message will be dropped. See: https://kafka.apache.org/documentation/#brokerconfigs.

The following example demonstrates sending a UTF-8 string “Hello, KafkaTopic” message to the Kafka topic “KafkaTopic”:

public override ProcessingResult ProcessMessages(ProcessingContext context, MySossObject dt, byte[]     mySerializedMessage)
{
        MyMessage msg = //deserialize message
        if(msg.SendToTopic())
        {
                context.KafkaConnection().PublishToTopicAsync("KafkaTopic", "MyKey", Encoding.UTF8.GetBytes("Hello, KafkaTopic!"));
        }
        return ProcessingResult.DoUpdate;
}