Rabbitmq integration spring mvc

引包

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

rabbitmq.properties

## rabbitmq basic parameter configuration##
username=guest
password=guest
host=192.168.74.167
port=5672
virtual_host=/

rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
       xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
     http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
     http://www.springframework.org/schema/rabbit  
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
     "> 

     <!-- thread pool configuration-->
     <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">            
        <!-- core thread number, default is 1 -->            
        <property name="corePoolSize" value="10" />            
        <!-- The maximum number of threads, the default is Integer.MAX_VALUE -->            
        <property name="maxPoolSize" value="50" />            
        <!-- The maximum length of the queue, generally need to set the value>=notifyScheduledMainExecutor.maxNum; the default is Integer. MAX_VALUE -->            
        <property name="queueCapacity" value="3000" />            
        <!-- The idle time allowed by the thread pool maintenance thread, the default is 60s -->            
        <property name="keepAliveSeconds" value="300" />            
        <!-- The processing strategy of the thread pool for the reject task (wireless process available), At present, only AbortPolicy and CallerRunsPolicy are supported; the default is the latter-->            
        <property name="rejectedExecutionHandler">            
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />            
        </property>            
    </bean>


    <!-- Create connectionFactory -->  
    <rabbit:connection-factory id="connectionFactory" host="${host}" username="${username}"  
        password="${password}" port="${port}" virtual-host="${virtual_host}"/> 

     <!-- By specifying the following admin information, the exchange and queue in the current productor will Automatically generated on the rabbitmq server -->
     <rabbit:admin connection-factory="connectionFactory"/>

     <!-- Define the rabbit template for receiving and sending data-->
     <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />

     <!-- Broadcast-->
     <rabbit:fanout-exchange name="logs" durable="false" auto-delete="false">
         <rabbit:bindings>
            <rabbit:binding queue="logs_1"></rabbit:binding>
            <rabbit:binding queue="logs_2"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:fanout-exchange>
     <!-- Line up --> 

    <rabbit:queue name="logs_1" durable="false" auto-delete="false" exclusive="false">
    </rabbit:queue>
    <rabbit:queue name="logs_2" durable="false" auto-delete="false" exclusive="false">
    </rabbit:queue>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="task_queue" ref="taskListenter"/>
        <rabbit:listener queues="logs_1" ref="fanoutListenter" response-exchange="logs"/> 
        <rabbit:listener queues="logs_2" ref="fanoutListenter" response-exchange="logs"/> 
    </rabbit:listener-container>
</beans>     


applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-4.1.xsd   
          http://www.springframework.org/schema/context   
          http://www.springframework.org/schema/context/spring-context-4.1.xsd   
          http://www.springframework.org/schema/aop   
          http://www.springframework.org/schema/aop/spring-aop-4.1.xsd   
          http://www.springframework.org/schema/tx    
          http://www.springframework.org/schema/tx/spring-tx-4.1.xsd">


    <context:component-scan base-package="com.hhly.*" />

    <!-- Load rabbitmq -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath*:rabbitmq.properties</value>
            </list>
        </property>
    </bean>

    <!-- Introduce configuration file -->
    <import resource="rabbitmq.xml"/>

 </beans>

生产

package com.hhly.rabbitmq.spring.produce;

public interface MQProducer {
    /**
     * Send a message to the specified queue
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, String message);
    /**
     * Send broadcast information
     * @author jiangwei
     * @Version 1.0
     *@CreatDateMarch 27, 2017 2:17:38 PM
     * @param exchange
     * @param object
     */
    public void sendDataToFanout(String exchange,String message);
}
package com.hhly.rabbitmq.spring.produce;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQProducerImpl implements MQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendDataToQueue(String queueKey, String message) {
        byte [] body= message.getBytes();
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //properties.setPriority(5);
        Message message2 = new Message(body,properties );
        amqpTemplate.send(queueKey,message2);
    }

    @Override
    public void sendDataToFanout(String exchange, String message) {
        amqpTemplate.convertAndSend(exchange, "", message);
    }

}

消费

package com.hhly.rabbitmq.spring.consumer;

import java.io.UnsupportedEncodingException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListenter implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println(new String(message.getBody(),"UTF-8"));
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

测试类

package com.hhly.rabbitmq;

import java.util.UUID;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.hhly.rabbitmq.spring.produce.MQProducer;
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestQueue {
    @Autowired
    MQProducer mqProducer;

    @Autowired
    private AmqpTemplate amqpTemplate;


    @Test
    public void sendFanout() {
        int i = 0;
        for(;;){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            String message = "hello,rabbmitmq!"+ i++;
            mqProducer.sendDataToFanout("logs", message);
        }
    }


}