使用BlockingQueue实现简明的JMS连接异常恢复
JMS连接有可能出现异常,对于同步处理(MessageProducer发送或者MessageConsumer调用receive方法)来说,因为是主动调用的,因此可以通过简单的定时重发来重新创建连接。如果是异步接收,即通过实现MessageListener,就比较麻烦,因为是别人回调你的代码。
这里当然需要使用ExceptionListener,来监听jms连接异常。并在异常发生后做一些事情,比如试图恢复连接,如果不能恢复,间隔一段时间再试图连接。
以前做法,是ExceptionListener监听到异常后,在onException方法中设置一个标志对象,另外有一个线程来定时监听这个标志对象,发现异常后做连接的恢复,并重建和注册MessageListener和ExceptionListener到连接。方法比较混乱和复杂。
这里用java concurrent api的BlockingQueue和定时任务(见通过java concurrent实现定时任务)来简化这部分功能的实现。
BlockingQueue是Queue基础上,增加take和put方法,用来支持生产者消费者问题。take是消费者的动作,从Queue中获取队首的元素,如果队列为空,则自动线程阻塞,等待队列中有元素后再唤醒线程获得队首元素;put是生产者的动作,如果队列满(如果设置队列大小的话),则线程阻塞,直到有消费者获取队首元素后(队列不满了)唤醒线程加入元素到队列尾部。
在这里,BlockingQueue用来盛放JMSException,即,当ExceptionListener监听到连接异常后,在onException方法中将该JMSException加入队列。
创建队列:
private BlockingQueue<JMSException> exceptionsQueue = new LinkedBlockingQueue<JMSException>();
在ExceptionListener里的实现:
public void onException(JMSException exception) {
exceptionsQueue.add(exception);
}
要有一个ScheduledExecutorService:
private ScheduledExecutorService scheduledExecutorService;
创建检查是否有异常并且做恢复的定时任务:
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
logger.debug("check jms connection error.");
try {
exceptionsQueue.take();
} catch (InterruptedException e) {
logger.error("exception queue take error: "
+ e.getMessage());
}
exceptionsQueue.clear();
try {
_init();
} catch (JMSException e) {
exceptionsQueue.add(e);
return;
}
}
}, 1000, 1000 * 10, TimeUnit.MILLISECONDS);
完整的代码:
package stat.transfer.receive;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;import util.jms.JmsConfigBean;
public class MessageReceiveService {
private static final Logger logger = Logger
.getLogger(MessageReceiveService.class);@Autowired
@Qualifier("jmsConfigBean")
protected JmsConfigBean jmsConfigBean;private MessageReceiveWorker worker;
private ScheduledExecutorService scheduledExecutorService;
private BlockingQueue<JMSException> exceptionsQueue;
public MessageReceiveService() {
logger.debug("create message receive service …");
this.exceptionsQueue = new LinkedBlockingQueue<JMSException>();
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
logger.debug("check jms connection error.");
try {
exceptionsQueue.take();
} catch (InterruptedException e) {
logger.error("exception queue take error: "
+ e.getMessage());
}
exceptionsQueue.clear();
try {
_init();
} catch (JMSException e) {
exceptionsQueue.add(e);
return;
}
}
}, 1000, 1000 * 10, TimeUnit.MILLISECONDS);
logger.debug("create thread to monitoring jms connection error.");
}public void _init() throws JMSException {
logger.debug("create new worker …");
this.worker = new MessageReceiveWorker();
logger.debug("create new worker ok.");
}public void init() {
try {
_init();
} catch (JMSException e) {
logger.error("create new worker error: " + e.getMessage());
this.exceptionsQueue.add(e);
}
}public void close() {
this.scheduledExecutorService.shutdownNow();
try {
this.worker.close();
} catch (JMSException e) {
logger.error("close message receive service error: "
+ e.getMessage());
}
}private void doOnMessage(Session session) throws JMSException {
// TODO
}class MessageReceiveWorker implements MessageListener, ExceptionListener {
private Connection connection;
private Session session;
private MessageConsumer consumer;
public MessageReceiveWorker() throws JMSException {
connection = jmsConfigBean.getConnectionFactory()
.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createQueue(jmsConfigBean
.getQueueName()));
consumer.setMessageListener(this);
connection.setExceptionListener(this);
connection.start();
}public void close() throws JMSException {
this.connection.stop();
this.connection.close();
}@Override
public void onMessage(Message message) {
logger.debug("receive message");
try {
doOnMessage(session);
session.commit();
} catch (JMSException e) {
logger.error("on message error:" + e.getMessage(), e);
throw new RuntimeException(e);
}
logger.debug("receive message ok.");
}@Override
public void onException(JMSException exception) {
exceptionsQueue.add(exception);
}}
}
这里使用到了spring做一些参数比如jms配置信息的IOC配置。就不贴出来了。
1 Comment to “使用BlockingQueue实现简明的JMS连接异常恢复”
这篇文章上的评论的 RSS feed TrackBack URI
By zhdqcn, 2011年01月23日 @ 20:45
您好,代码能分享一份吗?
多谢了 zhdqCN@gmail.com