Here we are using Clustered based JMS setup on Jboss.
Test data contains the below json texts in *.json files.
For example TC1.json file contains below texts:
1 2 3 4 5 | { "ID":"clkeidkerk948fjfmfkk49k", "name":"Purushottam", "value":"20000010" } |
Below are the steps:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import java.io.File; import java.io.IOException; import java.util.Iterator; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.commons.io.FileUtils; import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.UDPBroadcastGroupConfiguration; import org.hornetq.api.jms.HornetQJMSClient; import org.hornetq.api.jms.JMSFactoryType; import org.hornetq.jms.client.HornetQConnectionFactory; public class PostQueue { public static QueueConnection connection = null; public PostQueue() { } private void initJMSLister() { try { String discoveryGroupName = "dg-group1"; String groupAddress = "231.7.7.7"; int groupPort = 9876; int localPort = -1; String localBindAddress = "142.133.174.76"; long refreshWaitTimeout = 1000; long initialWaitTimeout = 5000; |
Step2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | UDPBroadcastGroupConfiguration udpConfig = new UDPBroadcastGroupConfiguration(groupAddress, groupPort, localBindAddress, localPort); System.out.println("udpConfig=\n" + udpConfig); DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryGroupName, refreshWaitTimeout, initialWaitTimeout,udpConfig); System.out.println("groupConfiguration=\n" + groupConfiguration); HornetQConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration,JMSFactoryType.CF); System.out.println("jmsConnectionFactory=\n" + jmsConnectionFactory); String queueName = "QueueName1"; final Queue queue = HornetQJMSClient.createQueue(queueName); connection = jmsConnectionFactory.createQueueConnection("testuser", "testuser@1"); System.out.println("connection=\n" + connection); final QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = session.createReceiver(queue); receiver.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("Message received."); TextMessage eventMessage = (TextMessage) message; String jsonEventMessage = null; try { jsonEventMessage = eventMessage.getText(); System.out.println("Message Id: "+jsonEventMessage); }catch(Exception e){e.printStackTrace();} } }); connection.start(); |
Step3: This part will explained in 3 sections as shown below:
Listener part:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | System.out.println(" Event Queue Listerner Started"); new Thread() { public void run() { System.out.println(" Event Queue Sender Started"); try { javax.jms.QueueSender sender1 = session.createSender(queue); String[] ext = {"json"}; Iterator<File> it = FileUtils.iterateFiles(new File("/opt/jboss/testjms/testdata"), ext, true); while(it.hasNext()){ File test = (File)it.next(); String tmp = FileUtils.readFileToString(test, "UTF-8"); System.out.println(test.getName()+"\n"+tmp); TextMessage tm = session.createTextMessage(tmp); sender1.send(tm); System.out.println("msg. "+test.getName() + " sent."); Thread.sleep(1000); } } catch (Exception e1) { e1.printStackTrace(); } } }.start(); |
Shutdown hook for JMS connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { if(connection!=null) try { connection.close(); System.out.println("connection closed"); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (Exception ee) { ee.printStackTrace(); } } |
Main method:
1 2 3 4 5 6 7 8 9 10 | public static void main(String[] args) { try { PostQueue ab = new PostQueue(); ab.initJMSLister(); } catch (Exception ex) { ex.printStackTrace(); } } } |