Custom Search

Tuesday, January 27, 2009

JMS (ActiveMQ) using Spring

For a recent project - I wanted to get started with JMS implementations and finally settled on ActiveMQ . I chose the Spring framework because of the range of integration options it gives us with the other parts of the stack.

Here is the sample code fragment using the same. Pre-requisites: Download Apache ActiveMQ 5.2.0 and Spring JMS 2.5.6.A (use ivy from the spring repository to grab the same).

Launch the activemq binary , before running the program below. The binary is usually available in $ACTIVEMQ_HOME/bin/activemq. The binary launches the tcp listening endpoint using the openwire protocol. You will see a line similar to below

INFO  TransportServerThreadSupport   - Listening for connections at: <b>tcp://hostname:61616</b>
INFO  TransportConnector             - Connector <b>openwire</b> Started


package mymq;

import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class Producer {

public static class FlyWeight implements Serializable {

public FlyWeight(String _msg) {
msg = _msg;
}

private String msg;

@Override
public String toString() {
return msg;
}
}

/**
* @param args
* @throws JmsException
*/
public static void main(String[] args) {
Producer prod = new Producer();
prod.startProducer();
prod.startConsumer();
}

public void startProducer() {
service.submit(new Runnable() {

public void run() {
try {
JmsTemplate template = new JmsTemplate(getConnectionFactory());
template.afterPropertiesSet();
final DateFormat fmt = new SimpleDateFormat("HH:mm:ss");
while (true) {
Thread.sleep(1000 * 2);
template.send(QUEUE_NAME, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
msg.setObject(new FlyWeight(fmt.format(new Date())));
return msg;
}

});

}
} catch (Exception ex) {

}
}
});
}

public void startConsumer() {
service.submit(new Runnable() {
public void run() {
try {
JmsTemplate template = new JmsTemplate(getConnectionFactory());
template.afterPropertiesSet();
while (true) {
Thread.sleep(1000 * 2);
Message msg = template.receive(QUEUE_NAME);
if (msg instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage text = (ActiveMQObjectMessage) msg;
System.out.println(text.getObject());
} else {
System.err.println("Message type invalid " + msg.getClass());
}
}
} catch (Exception ex) {

}
}
});
}

static ConnectionFactory getConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
// Default port.
// Important: The script 'activemq' must be launched for this program to
// work
// By default - activemq binds a tcp listener (openwire protocol)
// listening to requests at the same.
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}

static final String QUEUE_NAME = "MyQueue";

static ExecutorService service = Executors.newFixedThreadPool(2);
}

No comments: