In order to start receiving messages published for topics. You have to create a subscriber configure to receive message based on filters, transports and topic.

Sample below.

var topicName = "ChatTopic";
var subscriberName = "MySubscriber";

Subscriber.New(subscriberName).Durable(false)
.SubscribeTo(Topic.Select<ChatTopic>().NotEqual("UserName", _userName))
.AddTransport("Tcp", Transport.New<TcpTransport>(
transport => { 
transport.Format = TransportFormat.Json;
transport.IPAddress = "127.0.0.1"; 
transport.Port = port; 
}), topicName)
.Save()


Explains for each method call below:

Create a new instance of subscriber with the name supplied in the New method

Subscriber.New(subscriberName)



Specific that the subscriber should be a durable or non durable. By default all subscriber are non durable. By enabling this option, message will be saved by the esb when the subscriber fail to receive topic message within the time specified by the topic publisher. By default each message are kept for 30 day when publisher does not specify the expiration of message

subscriber.Durable(false)



Configure subscriber to subscribe to a specific topic, this method can be called multiple times to subscribe to multiple topics. Each topic passed to SubscribeTo can be filter with extension method to set condition that determine when subscriber can receive the topic message

subscriber.SubscribeTo(Topic.Select<ChatTopic>().NotEqual("UserName", _userName))



Configure which transport should be used to receive the topic message. The first parameter is an alias to which is latter used by OnMessageReceive method to auto discover the correct transport to listen on. the last parameter is optional and should be used only when you want to map specific transport to a topic. If the last parameter is not passed then the transport will automatically be applied to any topic that is subscribed to. Meaning that multiple topic messages will be stored in the same transport location. This method can be called for each topic that subscribed to.

subscriber.AddTransport("Tcp", Transport.New<TcpTransport>(
transport => { 
transport.Format = TransportFormat.Json;
transport.IPAddress = "127.0.0.1"; 
transport.Port = port; 
}), topicName)



Commit the subscriber subscription. If this method is not called then subscription would be lost

subscriber.Save()

 

How to listen for message on transport created for subscriber

 

subscriber.OnMessageReceived<ChatTopic>("Tcp", msg => HandleMessage(msg),
            interval: TimeSpan.FromMilliseconds(1),
            errorAction: h => {
                h.Continue = true;
                Console.WriteLine(h.Error.ToString());
            }); 

 

Explanation of logic:

The first parameter supplied is the alias provided when called the AddTransport method on subscriber. The second parameter is a method that is called for each message received and the type will be of the Generic T for OnMessageReceived. The interval parameter is how often to check for new message.  The error action is called when an exception occurs either at the message handler logic or when the transport listener fails internally. When using the error action, you can set the continue property to true to allow the listener to continue listening for new message and try to recover as needed. By default the continue parameter is set to false.

Last edited Apr 21, 2012 at 9:27 PM by rpgmaker, version 2

Comments

No comments yet.