使用BlockingQueue实现简明的JMS连接异常恢复

JMS连接有可能出现异常,对于同步处理(MessageProducer发送或者MessageConsumer调用receive方法)来说,因为是主动调用的,因此可以通过简单的定时重发来重新创建连接。如果是异步接收,即通过实现MessageListener,就比较麻烦,因为是别人回调你的代码。

这里当然需要使用ExceptionListener,来监听jms连接异常。并在异常发生后做一些事情,比如试图恢复连接,如果不能恢复,间隔一段时间再试图连接。

以前做法,是ExceptionListener监听到异常后,在onException方法中设置一个标志对象,另外有一个线程来定时监听这个标志对象,发现异常后做连接的恢复,并重建和注册MessageListener和ExceptionListener到连接。方法比较混乱和复杂。

这里用java concurrent api的BlockingQueue和定时任务(见通过java concurrent实现定时任务)来简化这部分功能的实现。

BlockingQueue是Queue基础上,增加take和put方法,用来支持生产者消费者问题。take是消费者的动作,从Queue中获取队首的元素,如果队列为空,则自动线程阻塞,等待队列中有元素后再唤醒线程获得队首元素;put是生产者的动作,如果队列满(如果设置队列大小的话),则线程阻塞,直到有消费者获取队首元素后(队列不满了)唤醒线程加入元素到队列尾部。

在这里,BlockingQueue用来盛放JMSException,即,当ExceptionListener监听到连接异常后,在onException方法中将该JMSException加入队列。

创建队列:

private BlockingQueue<JMSException> exceptionsQueue = new LinkedBlockingQueue<JMSException>();

在ExceptionListener里的实现:

public void onException(JMSException exception) {
    exceptionsQueue.add(exception);
}

要有一个ScheduledExecutorService:

private ScheduledExecutorService scheduledExecutorService;

创建检查是否有异常并且做恢复的定时任务:

this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        logger.debug("check jms connection error.");
        try {
            exceptionsQueue.take();
        } catch (InterruptedException e) {
            logger.error("exception queue take error: "
                    + e.getMessage());
        }
        exceptionsQueue.clear();
        try {
            _init();
        } catch (JMSException e) {
            exceptionsQueue.add(e);
            return;
        }
    }
}, 1000, 1000 * 10, TimeUnit.MILLISECONDS);

 

完整的代码:

package stat.transfer.receive;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

import util.jms.JmsConfigBean;

public class MessageReceiveService {

    private static final Logger logger = Logger
            .getLogger(MessageReceiveService.class);

    @Autowired
    @Qualifier("jmsConfigBean")
    protected JmsConfigBean jmsConfigBean;

    private MessageReceiveWorker worker;

    private ScheduledExecutorService scheduledExecutorService;

    private BlockingQueue<JMSException> exceptionsQueue;

    public MessageReceiveService() {
        logger.debug("create message receive service …");
        this.exceptionsQueue = new LinkedBlockingQueue<JMSException>();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                logger.debug("check jms connection error.");
                try {
                    exceptionsQueue.take();
                } catch (InterruptedException e) {
                    logger.error("exception queue take error: "
                            + e.getMessage());
                }
                exceptionsQueue.clear();
                try {
                    _init();
                } catch (JMSException e) {
                    exceptionsQueue.add(e);
                    return;
                }
            }
        }, 1000, 1000 * 10, TimeUnit.MILLISECONDS);
        logger.debug("create thread to monitoring jms connection error.");
    }

    public void _init() throws JMSException {
        logger.debug("create new worker …");
        this.worker = new MessageReceiveWorker();
        logger.debug("create new worker ok.");
    }

    public void init() {
        try {
            _init();
        } catch (JMSException e) {
            logger.error("create new worker error: " + e.getMessage());
            this.exceptionsQueue.add(e);
        }
    }

    public void close() {
        this.scheduledExecutorService.shutdownNow();
        try {
            this.worker.close();
        } catch (JMSException e) {
            logger.error("close message receive service error: "
                    + e.getMessage());
        }
    }

    private void doOnMessage(Session session) throws JMSException {
        // TODO
    }

    class MessageReceiveWorker implements MessageListener, ExceptionListener {

        private Connection connection;

        private Session session;

        private MessageConsumer consumer;

        public MessageReceiveWorker() throws JMSException {
            connection = jmsConfigBean.getConnectionFactory()
                    .createConnection();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(session.createQueue(jmsConfigBean
                    .getQueueName()));
            consumer.setMessageListener(this);
            connection.setExceptionListener(this);
            connection.start();
        }

        public void close() throws JMSException {
            this.connection.stop();
            this.connection.close();
        }

        @Override
        public void onMessage(Message message) {
            logger.debug("receive message");
            try {
                doOnMessage(session);
                session.commit();
            } catch (JMSException e) {
                logger.error("on message error:" + e.getMessage(), e);
                throw new RuntimeException(e);
            }
            logger.debug("receive message ok.");
        }

        @Override
        public void onException(JMSException exception) {
            exceptionsQueue.add(exception);
        }

    }
}

 

这里使用到了spring做一些参数比如jms配置信息的IOC配置。就不贴出来了。

創建PDF格式    发送文章为PDF   

1 Comment to “使用BlockingQueue实现简明的JMS连接异常恢复”

  1. By zhdqcn, 2011年01月23日 @ 20:45

    您好,代码能分享一份吗?
    多谢了 zhdqCN@gmail.com

这篇文章上的评论的 RSS feed TrackBack URI

Leave a Reply