spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)ActiveMQ

小编:管理员 320阅读 2022.08.02

上文:spring整合中间件(RocketMQ、kafka、RabbitMQ)-RabbitMQ


环境相关先参照:ActiveMQ windows10 安装

activemq java实现简单收发

项目结构

│ pom.xml
│
└─src
    ├─main
    │ ├─java
    │ │ └─com
    │ │ └─hong
    │ │ └─activemq
    │ │ Consumer.java
    │ │ Producer.java
    │ │
    │ └─resources
    └─test
        └─java
复制

com.hong.activemq.Consumer

package com.hong.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author: csh
 * @Date: 2021/4/19 15:06
 * @Description:消费者
 */
public class Consumer {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.URL);
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Queue destination = session.createQueue(Producer.queueName);
        MessageConsumer consumer = session.createConsumer(destination);
        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
            try {
                    System.out.println("收到信息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
复制
package com.hong.activemq;


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author: csh
 * @Date: 2021/4/19 15:05
 * @Description:active生产
 */
public class Producer {
    //地址
    public static final String URL = "tcp://localhost:61616";
    //队列名称
    public static final String queueName = "QUEUE_HONG";

    public static void main(String args[]) throws JMSException {
        //1.创建ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标
        Destination destination = session.createQueue(queueName);
        //6.创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0 ;i < 10; i++){
            //7.创建消息
            TextMessage message = session.createTextMessage("hello I'm hong:"+i);
            //8.发送消息
            producer.send(message);
            System.out.println("发送消息:"+i+"成功!");
        }
        //9.关闭连接
        connection.close();
}
}
复制

spring 整合 activemq

spring_activemq_producer 生产者 端口:8488

项目结构

├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─ao
│ │ │ └─common
│ │ └─resources
│ └─test
│ └─java
├─target
│ ├─classes
│ │ └─com
│ │ └─hong
│ │ └─spring
│ │ ├─ao
│ │ └─common
│ └─generated-sources
│ └─annotations
└─web
    └─WEB-INF
复制

实现代码

com.hong.spring.common.MqUtils

package com.hong.spring.common;

import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;


/**
 * @author: csh
 * @Date: 2021/4/19 16:21
 * @Description:mq工具类
 */
@Service
@Log4j2
public class MqUtils {
    @Autowired
    private JmsTemplate jmsTemplate;
    /**
     * 向指定Destination(队列)发送text消息
     *
     * @param destination
     * @param message
     */
    public void sendTxtMessage(Destination destination, final String message) {
        if (null == destination) {
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                log.info("发送的消息{}",message);
               return session.createTextMessage(message);
            }
        });

    }
}
复制

com.hong.spring.UserController

package com.hong.spring;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.ao.UserSaveAO;
import com.hong.spring.common.MqUtils;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Destination;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private MqUtils mqUtils;

    @Autowired
    private Destination queueDestination;


    @RequestMapping("save")
    public DataResponse save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           User user = new User();
           BeanUtils.copyProperties(ao,user);
           //发送
            mqUtils.sendTxtMessage(queueDestination,JSONObject.toJSONString(user));
           return DataResponse.BuildSuccessResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}
复制

com.hong.spring.ao.UserSaveAO

package com.hong.spring.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

applicationContext.xml





   
   
   
   

   
   

   


   
      
      
      
      
      

   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   
复制

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

activemq.xml



   
   
      
      
      
   
   
   
      
   
   
   
        
        
   
   
   
      
   

    
    
        
        
        
        
        
    
复制

activemq.properties

#地址
activemq.brokerURL=tcp://127.0.0.1:61616
activemq.username=admin
activemq.password=admin
#多队列名称以逗号分割开
activemq.queueName=queue_spring_user
复制

WEB-INF/web.xml



    
        spring_activemq_producer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:activemq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_activemq_producer
        /
    
复制

请求接口:http://localhost:8488/user/save?username=spring_activemq&age=101

参数

username:spring_activemq
age:101
复制

spring_activemq_consumer 消费者 端口:8483

项目结构

│ pom.xml
│ spring_activemq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ activemq.properties
│ │ activemq.xml
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │
│ └─test
│ └─java
│ │ └─com
│ │ └─hong
│ │ └─spring
│ │ ├─dao
│ │ │ UserMapper.class
│ │ │
│ │ ├─listener
│ │ │ UserListener.class
│ │ │
│ │ ├─mapper
│ │ │ UserMapper.xml
│ │ │
│ │ └─provider
│ │ UserServiceImpl.class
│ │
│ └─generated-sources
│ └─annotations
└─web
    └─WEB-INF
            web.xml
复制

代码实现

com.hong.spring.dao.UserMapper

package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List  list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List findByPage(UserAO ao);
}
复制

com.hong.spring.listener.UserListener

package com.hong.spring.listener;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
@Service("userListener")
public class UserListener implements MessageListener {

    @Autowired
    private IUserService userService;

    @Override
    public void onMessage(Message message) {
        try {
            log.info("传进来的数据为{}",JSONObject.toJSONString(message));
            if(null!=message){
                TextMessage tm = (TextMessage) message;
                String text = tm.getText();
                log.info("获取到的文本内容:"+text);
                User user = JSONObject.parseObject(text, User.class);
                userService.save(user);
            }
        }catch (Exception e){
            log.error("处理出错{}",e);
        }
    }
}
复制

src/main/java/com/hong/spring/mapper/UserMapper.xml




    
        
        
        
    

    

    

    

    

    
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    

    
        insert into user
        ( user_name, age)
        values
        
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        
    

    
        update user
        
            
                user_name=#{username,jdbcType=VARCHAR},
            
            
                age =#{age,jdbcType=INTEGER}
            
        
        where id = #{id,jdbcType=INTEGER}
    
复制

com.hong.spring.provider.UserServiceImpl

package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse> findByAll() {
        List  allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse  save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse  insertBatch(List  list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse  update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse  findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse > findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List  byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}
复制

src/main/resources/activemq.properties

#地址
activemq.brokerURL=tcp://127.0.0.1:61616
activemq.username=admin
activemq.password=admin
#多队列名称以逗号分割开
activemq.queueName=queue_spring_user
复制

src/main/resources/activemq.xml




   

   
   
      
      
      
   
   
   
      
      
   

   
   
        
        
   
   
   
      
   

    
    
        
        
        
        
        
    


   
   
      
      
      
   
复制

src/main/resources/application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

src/main/resources/applicationContext.xml





   
   
   
   

   
   
   
   
   
   


   
      
      
      
      
      

   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   


   
      
      
      
      
      

      
      
      
      
      
      
      
      
      

      
      
      
      

      
      
      
      
      
      
      
      
      
      
      
   

   
   
   
      
      
      
      
      
      
   

   
   
      
      
      
      
   
   
   
      
   
复制

src/main/resources/jdbc.properties

config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
复制

src/main/resources/log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

src/main/resources/logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

src/main/resources/mybatis.xml





    
    
        
        
        
        
        
        
        
        
        
        
        
        
    

    
    
        
    

复制

web/WEB-INF/web.xml



    
        spring_activemq_consumer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:activemq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_activemq_consumer
        /
    
复制

tomcat配置及启动消费

[2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Artifact is deployed successfully
[2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Deploy took 10,965 milliseconds
17:11:09.982 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821878763,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAJ3siYWdlIjoxMCwidXNlcm5hbWUiOiJzcHJpbmdfYWN0aXZlbXEifQ==","length":43,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821878761,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":92,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1067,"text":"{\"age\":10,\"username\":\"spring_activemq\"}","timestamp":1618821878761,"wireFormatInfo":false}
 INFO | {dataSource-1} inited
17:11:11.731 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821970677,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAKHsiYWdlIjoxMDEsInVzZXJuYW1lIjoic3ByaW5nX2FjdGl2ZW1xIn0=","length":44,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821970675,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":97,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1068,"text":"{\"age\":101,\"username\":\"spring_activemq\"}","timestamp":1618821970675,"wireFormatInfo":false}
复制

相对来说spring 整合activemq非常简单,而且这个activemq相关的接口文档也很easy。

springboot 整合 activemq

启动前先启动zk

实现一个逻辑,通过api请求,然后请求producer如果是get请求则直接查mybatis,如果是post请求则直接发送activemq给consumer消费端,当消费消息的时候i%==0则为队列添加,如果是i%!=0则进行topic添加。

springboot_activemq_api 端口:8386

│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ └─controller
        │ IndexController.java
        │ UserController.java
        │
        └─resources
                application.properties
复制

com.hong.springboot.controller.IndexController

package com.hong.springboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: csh
 * @Date: 2021/1/12 10:16
 * @Description:首页
 */
@RestController
public class IndexController {
    @RequestMapping("/")
    public String index(){
        return "成功!";
    }
}
复制

com.hong.springboot.controller.UserController

package com.hong.springboot.controller;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:用户
 */
@RestController
@Slf4j
@RequestMapping("/user")
public class UserController {
    @Reference
    private IUserService userService;

    @GetMapping("/findByAll")
    public DataResponse> findByAll(){
        try {
            return userService.findByAll();
        } catch (Exception e){
            log.error("查询出错{}",e);
        }
        return DataResponse.BuildFailResponse("查询出错!");
    }

    @PostMapping("/save")
    public DataResponse save(User ao){
        if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        DataResponse  save = userService.save(ao);
        return save;
    }
}
复制

com.hong.springboot.Application

package com.hong.springboot;


import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}
复制

application.properties

#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller

#避免端口冲突
server.port=8386
复制

springboot_all/springboot_activemq_api/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_activemq_api


    
        1.8
    

    
        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

    

    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

springboot_activemq_producer 端口:8387

com.hong.springboot.config.ActiveMQConfig

package com.hong.springboot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.Queue;
import javax.jms.Topic;


@Configuration
public class ActiveMQConfig {
    @Value("${spring.activemq.queue}")
    private String queueName;

    @Value("${spring.activemq.topic}")
    private String topicName;

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //设置为发布订阅方式, 默认情况下使用的生产消费者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
}
复制

com.hong.springboot.config.DruidConfig

package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}
复制

com.hong.springboot.dao.UserMapper

package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List findAllUser();

    @Insert("insert into user (user_name,age) values(#{username},#{age})")
    int insert(User user);
}
复制

com.hong.springboot.provider.UserServiceImpl

package com.hong.springboot.provider;


import com.alibaba.druid.util.StringUtils;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.transaction.annotation.Transactional;

import javax.jms.Queue;
import javax.jms.Topic;
import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @Override
    public DataResponse> findByAll() {
        List  allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }
    //随机用
    private static int i = 0;

    @Override
    public DataResponse  save(User userAO) {
        i++;
        log.info("需要activemq添加的用户信息{}",JSONObject.toJSONString(userAO));
        try {
            //当==0时则用队列发送,当不等于的时候用topic发送。
            if(i%2==0){
                jmsMessagingTemplate.convertAndSend(queue,JSONObject.toJSONString(userAO));
            }else{
                jmsMessagingTemplate.convertAndSend(topic,JSONObject.toJSONString(userAO));
            }
        }catch (Exception e){
            e.printStackTrace();
            return DataResponse.BuildSuccessResponse(false);
        }
        return DataResponse.BuildSuccessResponse(true);
    }

    @Transactional
    @Override
    public DataResponse  reallySave(User user) {
        log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
        if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int insert = userDao.insert(user);
        return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
    }
}
复制

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}
复制

application.properties

rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rabbitmq_producer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo

#避免端口冲突
server.port=8387
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456


#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
# 基于内存的ActiveMQ
#spring.activemq.in-memory=true
spring.activemq.queue=queue_springboot_user
spring.activemq.topic=topic_springboot_user
复制

springboot_all/springboot_activemq_producer/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_activemq_producer


    
        1.8
    

    

        
            org.springframework.boot
            spring-boot-starter-activemq
        


        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.1.3
        

        
            com.alibaba
            druid-spring-boot-starter
            1.1.10
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                
                
                
                    
                
            
        
    
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

springboot_activemq_consumer 端口:8388

com.hong.springboot.config.ActiveMQConfig

package com.hong.springboot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.Queue;
import javax.jms.Topic;


@Configuration
public class ActiveMQConfig {
    @Value("${spring.activemq.user}")
    private String queueName;

    @Value("${spring.activemq.topic}")
    private String topicName;

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }




    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        bean.setPubSubDomain(false);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //设置为发布订阅方式, 默认情况下使用的生产消费者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
}
复制

com.hong.springboot.listener.UserListener

package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.TextMessage;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@Log4j2
public class UserListener {

    @Reference
    private IUserService userService;

    /**
     *
     * 功能描述: 基于队列添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/4/20 10:56
     */
    @JmsListener(destination = "${spring.activemq.queue}",containerFactory = "jmsListenerContainerQueue")
    public void onMessage(TextMessage msg) {
        log.info("进入队列消费");
        if(null==msg){
            return;
        }
        try {
            User user = JSONObject.parseObject(msg.getText(),User.class);
            log.info("最终要添加的值{}",JSONObject.toJSONString(user));
            DataResponse save = userService.reallySave(user);
            if(save==null || !save.getData()){
                log.info("添加失败,原因{}",JSONObject.toJSONString(save));
            }
        }catch (Exception e){
            log.error("添加出错",e);
        }
    }

    @JmsListener(destination = "${spring.activemq.topic}",containerFactory = "jmsListenerContainerTopic")
    public void onMessage2(TextMessage msg) {
        log.info("进入topic消费");
        if(null==msg){
            return;
        }
        try {
            User user = JSONObject.parseObject(msg.getText(),User.class);
            log.info("最终要添加的值{}",JSONObject.toJSONString(user));
            DataResponse save = userService.reallySave(user);
            if(save==null || !save.getData()){
                log.info("添加失败,原因{}",JSONObject.toJSONString(save));
            }
        }catch (Exception e){
            log.error("添加出错",e);
        }
    }

}
复制

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
@EnableJms
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}
复制

application.properties

rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#避免端口冲突
server.port=8388

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#spring.activemq.pool.enabled=true
#spring.activemq.pool.max-connections=50
# 基于内存的ActiveMQ
#spring.activemq.in-memory=true
spring.activemq.queue=queue_springboot_user
spring.activemq.topic=topic_springboot_user
复制

springboot_all/springboot_activemq_consumer/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_activemq_consumer


    
        1.8
    

    

        
            org.springframework.boot
            spring-boot-starter-activemq
        



        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                
                
                
                    
                
            
        
    
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

然后请求如下:http://localhost:8386/user/save?username=springboot_activemq&age=1000

username:springboot_activemq
age:1000
复制

结果如下:

2021-04-20 11:07:42.501  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入topic消费
2021-04-20 11:07:42.571  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}
2021-04-20 11:07:45.861  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入队列消费
2021-04-20 11:07:45.861  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}
复制

这样基本就完成了springboot整合activemq非常easy,并且容易上手,还整合相关的rpc进行查询与添加。

topic与队列的区别?

文章中涉及到队列与topic,但是有什么区别?

名称

Topic

Queue

概要

Publish Subscribe messaging 发布订阅消息

Point-to-Point 点对点

重复消费

可以重复消费

点对点模式所以不可重复消费

接收方式

无需主动请求,可由服务器推送

需要主动获取队列中的消息

有无状态

topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障

并不保证publisher发布的每条数据,Subscriber都能接受到。

Queue保证每条数据都能被receiver接收。

消息是否会丢失

一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

消息类型

是否持久化

是否有Durable订阅者

消费者延迟启动时,消息是否保留

Broker重启时,消息是否保留

Queue

N

-

Y

N

Queue

Y

-

Y

Y

Topic

N

N

N

N

Topic

N

Y

Y

N

Topic

Y

N

N

N

Topic

Y

Y

Y

Y

参考文章:

https://blog.csdn.net/admin1973/article/details/60125938

压测

通过jmeter压测,发现大量堆积,虽然说跟http和mysql有一定关系,但是这个堆积量太大了..还有失败率达97%....

最后

虽然说activemq属于中间件的第一批鼻主来的,但是国内使用activemq作为商业的场景不是特别多,因为kafka和rocketmq、rabbitmq该有的功能都有,并且性能更高及支持的场景更多,当然没有好坏只是合不合适罢了,本文只是一个基础整合,后续有时间再深入。

关联标签: