创建博客 登录  
 关注
   显示下一条  |  关闭

小白的博客

Caribbean是白色的

 
 
 

日志

 
 

Spring集成 JMS OpenMQ  

2010-11-24 13:14:13|  分类: glassfish |  标签: |字号 订阅

来源: http://www.javabloger.com/article/spring-jms-openmq.html

前端时间采用JMS API 直接访问 OpenMQ JMS服务器会出现一个现象,当JMS服务器down掉以后或者重启以后,JMS的接收端将无法工作,如果将程序改成MDB的方式将违背了我们的初衷,所 以采用一个相对折中的办法,采用Spring整合JMS OpenMQ。

不仅可以解决我们现在存在的问题,并且有以下优势:

1、占用资源资源少,对硬件配置要求低
2、部署简单、灵活,不限制于某种特定的J2EE容器
3、依托微容器管理扩展性强

代码如下:

连接器

package com.javabloger.jms;

import java.util.Enumeration;
import java.util.Properties;

import javax.jms.XAConnectionFactory;

public class OpenMqConnectionFactory {
   
    private Properties props;

    public void setProperties(Properties props) {
        this.props = props;
    }

    public XAConnectionFactory createConnectionFactory(){
        com.sun.messaging.XAConnectionFactory cf =
                new com.sun.messaging.XAConnectionFactory();
        try{
            Enumeration<?> keys = props.propertyNames();
            while (keys.hasMoreElements()) {
                String name = (String)keys.nextElement();
                String value = props.getProperty(name);
                cf.setProperty(name, value);
            }
        } catch (Exception e){
            throw new RuntimeException(
            "MQConnectionFactoryFactory.createConnectionFactory() failed: "+
            e.getMessage(), e);
        }
        return cf;
    }
}
 

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <bean id="connectionfactoryfactory" class="com.javabloger.jms.OpenMqConnectionFactory">
        <property name="properties">
            <props>
                <prop key="imqAddressList">192.168.20.210:7677,192.168.20.211:7677</prop>
            </props>
        </property>
    </bean>
 
    <bean id="mqConnectionFactory" factory-bean="connectionfactoryfactory"
        factory-method="createConnectionFactory" />

    <bean id="testmq" class="com.sun.messaging.Queue">
        <constructor-arg type="java.lang.String" value="testmq" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="defaultDestination" ref="testmq" />
        <property name="receiveTimeout" value="20000" />
    </bean>

    <bean id="messageListener1"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="com.javabloger.jms.SimpleMessageListener" />
        </constructor-arg>
        <property name="defaultListenerMethod" value="receive" />
        <property name="messageConverter">
            <null />
        </property>
    </bean>

<bean id="consumercontainer"
       class="org.springframework.jms.listener.DefaultMessageListenerContainer">
       <property name="connectionFactory" ref="mqConnectionFactory"/>
       <property name="destination" ref="testmq"/>
       <property name="messageListener" ref="messageListener1"/>
       <property name="transactionTimeout" value="180000"/>
       <property name="receiveTimeout" value="180000"/>
       <property name="sessionTransacted" value="true" />
</bean>

    <!– 
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="destination" ref="testmq" />
        <property name="messageListener" ref="messageListener1" />
    </bean>
–>
</beans>

接收端

package com.javabloger.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class SimpleMessageListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println("Received message: "+
                                      ((TextMessage) message).getText());
           
            }
            catch (JMSException ex) {
                System.out.println(
                "SimpleMessageListener.onMessage(): got exception: "+ex.getMessage());
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException(
            "MessageListener.onMessage(): Message must be of type TextMessage");
        }
    }
}
 

启动Srping与JMS集成的客户端代码

package com.javabloger.jms;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageConsumer {

    public static void main(String[] args) {
       
        ApplicationContext context =  new ClassPathXmlApplicationContext( "jms.xml");
          System.out.println( context.getId()   ) ;
         
    }

}

发送端代码

package com.javabloger.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.Queue;

public class QueueSender {

    /**
     * @param args
     * @throws JMSException
     */
    public static void main(String[] args) throws JMSException {
 
         ConnectionFactory myConnFactory;
         myConnFactory = new com.sun.messaging.ConnectionFactory();
         myConnFactory.setProperty(ConnectionConfiguration.imqAddressList,  "mq://192.168.20.211:7677");
         myConnFactory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
         Connection myConn = myConnFactory.createConnection();
         myConn.start();
         //Step 4:
         //Create a session within the connection.
         Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue myTopic=new com.sun.messaging.Queue("testmq");
         MessageProducer myMsgProducer = mySess.createProducer(myTopic);
        
        
         TextMessage myTextMsg = mySess.createTextMessage();
         for (int i=0;i<10;i++){
             myTextMsg.setText("Queue Msg ID: "+i+"  "+ new java.util.Date() );
             System.out.println( myTextMsg.getText()); 
             myMsgProducer.send(myTextMsg);
         }
     
        
         mySess.close();
         myConn.close();
    }

}
  评论这张
转发至微博
转发至微博
0   分享到:        
阅读(251)| 评论(0)| 引用 (0) |举报

历史上的今天

相关文章

最近读者

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--相关文章--> <#--历史上的今天--> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2012