Here we do Posting the Json string contained JMS text message into a JMS HornetQ on Jboss server.
Here we are using Clustered based JMS setup on Jboss.
Test data contains the below json texts in *.json files.
For example
TC1.json file contains below texts:
1
2
3
4
5 | {
"ID":"clkeidkerk948fjfmfkk49k",
"name":"Purushottam",
"value":"20000010"
}
|
Below are the steps:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.commons.io.FileUtils;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.UDPBroadcastGroupConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.jms.client.HornetQConnectionFactory;
public class PostQueue {
public static QueueConnection connection = null;
public PostQueue() {
}
private void initJMSLister() {
try {
String discoveryGroupName = "dg-group1";
String groupAddress = "231.7.7.7";
int groupPort = 9876;
int localPort = -1;
String localBindAddress = "142.133.174.76";
long refreshWaitTimeout = 1000;
long initialWaitTimeout = 5000;
|
above is the starting the code .
Step2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | UDPBroadcastGroupConfiguration udpConfig = new UDPBroadcastGroupConfiguration(groupAddress, groupPort, localBindAddress, localPort);
System.out.println("udpConfig=\n" + udpConfig);
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryGroupName, refreshWaitTimeout, initialWaitTimeout,udpConfig);
System.out.println("groupConfiguration=\n" + groupConfiguration);
HornetQConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration,JMSFactoryType.CF);
System.out.println("jmsConnectionFactory=\n" + jmsConnectionFactory);
String queueName = "QueueName1";
final Queue queue = HornetQJMSClient.createQueue(queueName);
connection = jmsConnectionFactory.createQueueConnection("testuser", "testuser@1");
System.out.println("connection=\n" + connection);
final QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("Message received.");
TextMessage eventMessage = (TextMessage) message;
String jsonEventMessage = null;
try {
jsonEventMessage = eventMessage.getText();
System.out.println("Message Id: "+jsonEventMessage);
}catch(Exception e){e.printStackTrace();}
}
});
connection.start();
|
Step3: This part will explained in 3 sections as shown below:
Listener part:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | System.out.println(" Event Queue Listerner Started");
new Thread() {
public void run() {
System.out.println(" Event Queue Sender Started");
try {
javax.jms.QueueSender sender1 = session.createSender(queue);
String[] ext = {"json"};
Iterator<File> it = FileUtils.iterateFiles(new File("/opt/jboss/testjms/testdata"), ext, true);
while(it.hasNext()){
File test = (File)it.next();
String tmp = FileUtils.readFileToString(test, "UTF-8");
System.out.println(test.getName()+"\n"+tmp);
TextMessage tm = session.createTextMessage(tmp);
sender1.send(tm);
System.out.println("msg. "+test.getName() + " sent.");
Thread.sleep(1000);
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}.start();
|
Shutdown hook for JMS connection:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
if(connection!=null)
try {
connection.close();
System.out.println("connection closed");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception ee) {
ee.printStackTrace();
}
}
|
Main method:
1
2
3
4
5
6
7
8
9
10 | public static void main(String[] args) {
try {
PostQueue ab = new PostQueue();
ab.initJMSLister();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
|