spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)

小编:管理员 209阅读 2022.08.01

本文源码下载:https://gitee.com/hong99/spring/issues/I1N1DF

TubeMQ是什么?

简介

TubeMQ是2019年腾讯在ApacheCon开源的一个消息中间件系统,性能优越。经过近7年、万亿规模的海量数据沉淀,TubeMQ目前日均接入量超过25万亿条消息。较之于其他的开源MQ组件,TubeMQ长期应用于真实生产环境中,在稳定性、性能和成本方面都有着核心优势。

https://inlong.apache.org/zh-cn/docs/quick_start.html

功能介绍

纯 Java 实现语言

引入 Master 协调节点:相比 Kafka 依赖于 Zookeeper 完成元数据的管理和实现 HA 保障不同,TubeMQ 系统采用的是自管理的元数据仲裁机制方式进行,Master 节点通过采用内嵌数据库 BDB 完成集群内元数据的存储、更新以及 HA 热切功能,负责 TubeMQ 集群的运行管控和配置管理操作,对外提供接口等;通过 Master 节点,TubeMQ 集群里的 Broker 配置设置、变更及查询实现了完整的自动化闭环管理,减轻了系统维护的复杂度

服务器侧消费负载均衡:TubeMQ 采用的是服务侧负载均衡的方案,而不是客户端侧操作,提升系统的管控能力同时简化客户端实现,更便于均衡算法升级

系统行级锁操作:对于 Broker 消息读写中存在中间状态的并发操作采用行级锁,避免重复问题

Offset 管理调整:Offset 由各个 Broker 独自管理,ZK 只作数据持久化存储用(最初考虑完全去掉ZK依赖,考虑到后续的功能扩展就暂时保留)

消息读取机制的改进:TubeMQ 采用的是消息随机读取模式,同时为了降低消息时延又增加了内存缓存读写,对于带 SSD 设备的机器,增加消息滞后转 SSD 消费的处理,解决消费严重滞后时吞吐量下降以及 SSD 磁盘容量小、刷盘次数有限的问题,使其满足业务快速生产消费的需求

消费者行为管控:支持通过策略实时动态地控制系统接入的消费者行为,包括系统负载高时对特定业务的限流、暂停消费,动态调整数据拉取的频率等;

服务分级管控:针对系统运维、业务特点、机器负载状态的不同需求,系统支持运维通过策略来动态控制不同消费者的消费行为,比如是否有权限消费、消费时延分级保证、消费限流控制,以及数据拉取频率控制等

系统安全管控:根据业务不同的数据服务需要,以及系统运维安全的考虑,TubeMQ 系统增加了 TLS 传输层加密管道,生产和消费服务的认证、授权,以及针对分布式访问控制的访问令牌管理,满足业务和系统运维在系统安全方面的需求

资源利用率提升改进:相比于 Kafka,TubeMQ 采用连接复用模式,减少连接资源消耗;通过逻辑分区构造,减少系统对文件句柄数的占用,通过服务器端过滤模式,减少网络带宽资源使用率;通过剥离对 Zookeeper 的使用,减少 Zookeeper 的强依赖及瓶颈限制

客户端改进:基于业务使用上的便利性以,我们简化了客户端逻辑,使其做到最小的功能集合,我们采用基于响应消息的接收质量统计算法来自动剔出坏4的 Broker 节点,基于首次使用时作连接尝试来避免大数据量发送时发送受阻。

Portal:负责对外交互和运维操作的Portal部分,包括API和Web两块,API对接集群之外的管理系统,Web是在API基础上对日常运维功能做的页面封装;

Master: 负责集群控制的Control部分,该部分由1个或多个Master节点组成,Master HA通过Master节点间心跳保活、实时热备切换完成(这是大家使用TubeMQ的Lib时需要填写对应集群所有Master节点地址的原因),主Master负责管理整个集群的状态、资源调度、权限检查、元数据查询等;

Broker: 负责实际数据存储的Store部分,该部分由相互之间独立的Broker节点组成,每个Broker节点对本节点内的Topic集合进行管理,包括Topic的增、删、改、查,Topic内的消息存储、消费、老化、分区扩容、数据消费的offset记录等,集群对外能力,包括Topic数目、吞吐量、容量等,通过水平扩展Broker节点来完成;

Client: 负责数据生产和消费的Client部分,该部分我们以Lib形式对外提供,大家用得最多的是消费端,相比之前,消费端现支持Push、Pull两种数据拉取模式,数据消费行为支持顺序和过滤消费两种。对于Pull消费模式,支持业务通过客户端重置精确offset以支持业务exactly-once消费,同时,消费端新推出跨集群切换免重启的BidConsumer客户端;

Zookeeper: 负责offset存储的zk部分,该部分功能已弱化到仅做offset的持久化存储,考虑到接下来的多节点副本功能该模块暂时保留。

详细请对考这里:https://inlong.apache.org/zh-cn/docs/architecture.html

源码实现

直接上docker (强列建议用docker哈,之前的tars自己部署搞了好几个星期....)

docker run -p 8080:8080 -p 8000:8000 -p 8123:8123 --name tubemq -d apachetubemq/tubemq-all:latest
复制

然后运行:localhost:8080 如下

集群部署参考:https://inlong.apache.org/zh-cn/docs/quick_start.html

新增topic

默认这里的授权字段是:abc

纯java实现

引入jar包


    0.8.0-incubating



    
        org.apache.tubemq
        tubemq-client
        ${tubemq-client-version}
    

    
        com.alibaba
        fastjson
        1.2.71
    
复制

生产端

生产-异步消息

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述: 异步消息
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 18:04
 */
public final class AsyncProducerExample {

   public static void main(String[] args) throws Throwable {
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       final String topic = "java_tubemq";
       final String body = "发送异步消息!";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       final Message message = new Message(topic, bodyData);
       messageProducer.sendMessage(message, new MessageSentCallback(){
           public void onMessageSent(MessageSentResult result) {
               if (result.isSuccess()) {
                   System.out.println("async send message : " + JSONObject.toJSONString(message));
               } else {
                   System.out.println("async send message failed : " + result.getErrMsg());
               }
           }
           public void onException(Throwable e) {
               System.out.println("async send message error : " + e);
           }
       });
       messageProducer.shutdown();
   }

}
复制

发送结果

async send message : {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":0,"topic":"java_tubemq"}
复制

同步消息-生产

import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述:同步消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:45
 */
public final class SyncProducerExample {

   public static void main(String[] args) throws Throwable {
       //主节点地址
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       //topic 刚刚创建那个
       final String topic = "java_tubemq";
       final String body = "hello,i'm hong!How are you?";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       Message message = new Message(topic, bodyData);
       MessageSentResult result = messageProducer.sendMessage(message);
       //判断结果 如果成功打印出 sync send message 加上消息内容
       if (result.isSuccess()) {
           System.out.println("同步发送出去的消息: " + JSON.toJSONString(message));
       }
       //关闭服务
       messageProducer.shutdown();
   }
}
复制

发送结果

同步发送出去的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":0,"topic":"java_tubemq"}
复制

消费端

import com.alibaba.fastjson.JSONObject;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;

import java.util.List;
/**
 *
 * 功能描述: 通过pull拉取消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:51
 */
public class PullConsumerExample {

     public static void main(String[] args) throws Throwable {
         //服务地址
         final String masterHostAndPort = "localhost:8000";
         //topic
         final String topic = "java_tubemq";
         //消费组
         final String group = "hong-group1";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, null);
         messagePullConsumer.completeSubscribe();
         // wait for client to join the exact consumer queue that consumer group allocated
         while (!messagePullConsumer.isPartitionsReady(1000)) {
             ThreadUtils.sleep(1000);
         }
         while (true) {
             ConsumerResult result = messagePullConsumer.getMessage();
             if (result.isSuccess()) {
                 List messageList = result.getMessageList();
                 for (Message message : messageList) {
                     System.out.println("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                 }
                 messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
             } else {
                 if (result.getErrCode() == 400) {
                     ThreadUtils.sleep(100);
                 } else {
                     if (result.getErrCode() != 404) {
                         System.out.println(String.format("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg()));
                     }
                 }
             }
         }
     } 

 }
复制

结果

接收到的消息: {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":1200513646816395264,"topic":"java_tubemq"}内容是:发送异步消息!
接收到的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":2057144306094833664,"topic":"java_tubemq"}内容是:hello,i'm hong!How are you?
复制
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageListener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
 *
 * 功能描述: push消息模式
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 17:53
 */
public class PushConsumerExample {
     public static void main(String[] args) throws Throwable {
         final String masterHostAndPort = "localhost:8000";
         final String topic = "java_tubemq";
         final String group = "hong-group";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
         pushConsumer.subscribe(topic, null, new MessageListener() {

             public void receiveMessages(List messages) throws InterruptedException {
                 for (Message message : messages) {
                     System.out.println("PUSH接收到的消息 : " + new String(message.getData()));
                 }
             }

             public Executor getExecutor() {
                 return null;
             }

             public void stop() {
                 //
             }
         });
         pushConsumer.completeSubscribe();
         CountDownLatch latch = new CountDownLatch(1);
         latch.await(10, TimeUnit.MINUTES);
     }
 }
复制

结果

PUSH接收到的消息 : 发送异步消息!
PUSH接收到的消息 : hello,i'm hong!How are you?
复制

关于数据拉取模式支持Push、Pull的区别:

Push客户端:TubeMQ最初消费端版本只提供Push模式的消费,这种模式能比较快速地消费数据,减轻服务端压力,但同时也带来一个问题,业务使用的时候因为无法控制拉取频率,从而容易形成数据积压数据处理不过来;

带消费中止/继续的Push客户端: 在收到业务反馈能否控制Push拉取动作的需求后,我们增加了resumeConsume()/pauseConsume()函数对,让业务可以模拟水位线控制机制,状态比较繁忙时调用pauseConsume()函数来中止Lib后台的数据拉取,在状态恢复后,再调用resumeConsume()通知Lib后台继续拉取数据;

Pull客户端: 我们后来版本里增加了Pull客户端,该客户端有别于Push客户端,是由业务而非Lib主动的拉取消息并对数据处理的结果进行成功与否的确认,将数据处理的主动权留给业务。这样处理后,虽然服务端压力有所提升,但业务消费时积压情况可大大缓解。

客户端与服务器端RPC交互过程:

客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。

spring整合TubeMQ

网上基本没有找着相关spring整合tubemq以及相关的学习资料,除了官网那些很久也没更新的...所以在java基础之上做一个简单的整合...如下

spring生产

│ pom.xml
│ spring_tubemq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ CommonTopic.java
│ │ │ │ TubeMqProducer.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml
复制

com.hong.spring.config.CommonTopic

package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/5/18 10:56
 * @Description:公共的topic
 */
public class CommonTopic  {
    //用户mq
    public static final String TUBETOPIC="tubemq_spring_user";
}
复制

com.hong.spring.config.TubeMqProducer

package com.hong.spring.config;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:tubemq配置
 */
@Log4j2
public class TubeMqProducer {


    /**服务端 */
    private static MessageProducer messageProducer;
    /**服务地址 */
    private String url;


    public TubeMqProducer(String url) {
        this.url = url;
    }

    public void init () throws TubeClientException {

        TubeClientConfig clientConfig =new TubeClientConfig(url);
        MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
        messageProducer = messageSessionFactory.createProducer();
    }

    /**
     *
     * 功能描述: 发送消息
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/4/28 17:05
     */
    private static Boolean resultFlag;
    public Boolean send(String msg,String topic)  {
        try {
            byte[] bodyData = StringUtils.getBytesUtf8(msg);
            messageProducer.publish(topic);
            final Message message = new Message(topic, bodyData);
            messageProducer.sendMessage(message, new MessageSentCallback(){
                public void onMessageSent(MessageSentResult result) {
                    if (result.isSuccess()) {
                        resultFlag=true;
                        log.info("同步发送消息成功 : " + JSONObject.toJSONString(message));
                    } else {
                        resultFlag =false;
                        log.info("发送消息出错 : " + result.getErrMsg());
                    }
                }
                public void onException(Throwable e) {
                    log.error("同步消息出错 : " + e);
                }
            });
            return resultFlag;
        }catch (Exception e){
            log.error("发送失败{}",e);
        }
        return false;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    /**
     *
     * 功能描述: 关闭
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:50
     */
    public void close(){
        try {
            if(null!=messageProducer){
                messageProducer.shutdown();
            }
        }catch (Exception e){
            log.error("关闭失败{}",e);
        } catch (Throwable throwable) {
            log.error("关闭失败{}",throwable);
            throwable.printStackTrace();
        }

    }
}
复制

com.hong.spring.controller.ao.UserSaveAO

package com.hong.spring.controller.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}
复制

com.hong.spring.controller.UserController

package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.CommonTopic;
import com.hong.spring.config.TubeMqProducer;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private TubeMqProducer push;


    @RequestMapping("save")
    public DataResponse save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           Boolean send = push.send(JSONObject.toJSONString(ao),CommonTopic.TUBETOPIC);
           if(null==send || !send){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}
复制

相关配置文件

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml





   
   
   
   

   
   

   


   
      
      
      
      
      
   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   
复制

log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

tubemq.properties

tubemq.url=localhost:8000
复制

tubemq.xml







   
   
      
   
复制

WEB-INF/web.xml



    
        spring_tubemq_producer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:tubemq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_tubemq_producer
        /
    
复制

spring_mq/spring_tubemq_producer/pom.xml



    
        spring_mq
        com.hong
        1.0-SNAPSHOT
    
    4.0.0

    com.hong.tubemq
    spring_tubemq_producer

    
        0.8.0-incubating
    

    
        
            spring_mq_common_api
            1.0-SNAPSHOT
            com.hong
        


        
            org.apache.tubemq
            tubemq-client
            ${tubemq-client-version}
        

        
            com.alibaba
            fastjson
            1.2.71
        

    

    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
            
        
    
复制

tomcat启动配置

postman发送配置

username:spring_tubemq
age:1
复制

结果

因为我的topic没有建,所以可以直接确认这个默认的情况下tubemq的topic需要手动来建。

重新添加topic

11:23:22.108 [http-nio-8683-exec-3] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:23:22.108 [http-nio-8683-exec-3] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:23:22.197 [pool-6-thread-2] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}
11:24:54.517 [http-nio-8683-exec-6] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:24:54.517 [http-nio-8683-exec-6] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:24:54.519 [pool-6-thread-1] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}
复制

消费端

│ pom.xml
│ spring_tubemq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ TubeMqConsumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml
复制

源码实现

com.hong.spring.config.TubeMqConsumer

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.listener.UserListener;
import lombok.extern.log4j.Log4j2;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:zero配置
 */
@Log4j2
public class TubeMqConsumer {

    //服务地址
    private String addrHost;
    //topic
    private String topic;
    //消费组
    private String group;
    //
    private MessageSessionFactory messageSessionFactory;
    //
    PullMessageConsumer messagePullConsumer;

    @Autowired
    private UserListener userListener;


    public TubeMqConsumer(String addrHost, String topic, String group) {
        this.addrHost = addrHost;
        this.topic = topic;
        this.group = group;
    }

    public void init(){
       try {
           final ConsumerConfig consumerConfig = new ConsumerConfig(addrHost, group);
           consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
           messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
           messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
           messagePullConsumer.subscribe(topic, null);
           messagePullConsumer.completeSubscribe();
           recvStr();
       }catch (Exception e){
            log.error("tubemq初始化失败",e);
       }
    }

    @Async
    public void recvStr(){
        try {
            log.info("开始自动拉取消息!");

            while (true){
                ConsumerResult result = messagePullConsumer.getMessage();
                if (result.isSuccess()) {
                    List messageList = result.getMessageList();
                    for (Message message : messageList) {
                        log.info("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                        userListener.Listener(new String(message.getData()));
                    }
                    messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
                } else {
                    if (result.getErrCode() == 400) {
                        ThreadUtils.sleep(100);
                    } else {
                        if (result.getErrCode() != 404) {
                            log.error("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg());
                        }
                    }
                }
            }
        }catch (Exception e){
            log.error("接收消失败请重试{}",e);
        }

    }
    /**
     *
     * 功能描述: 关闭接口
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:25
     */
    public void close(){
        try {
            if(messageSessionFactory!=null){
                messageSessionFactory.shutdown();
            }
        }catch (Exception e){
            log.error("关闭tubemq失败{}",e);
        }
    }

}
复制

com.hong.spring.dao.UserMapper

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List  list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List findByPage(UserAO ao);
}
复制

com.hong.spring.listener.UserListener

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
@Component
public class UserListener {

    @Autowired
    private IUserService userService;

    public void Listener(String str){
        log.info("获取的用户信息{}", str);
        User user = JSONObject.parseObject(str, User.class);
        DataResponse  save = userService.save(user);
        if(save==null || save.getData()==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }

}
复制

com/hong/spring/mapper/UserMapper.xml




    
        
        
        
    

    

    

    

    

    
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    

    
        insert into user
        ( user_name, age)
        values
        
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        
    

    
        update user
        
            
                user_name=#{username,jdbcType=VARCHAR},
            
            
                age =#{age,jdbcType=INTEGER}
            
        
        where id = #{id,jdbcType=INTEGER}
    
复制

com.hong.spring.provider.UserServiceImpl

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse> findByAll() {
        List  allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse  save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse  insertBatch(List  list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse  update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse  findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse > findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List  byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}
复制

相关配置文件

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml





   
   
   
   

   
   
   
   
   
   


   
      
      
      
      
      

   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   


   
      
      
      
      
      

      
      
      
      
      
      
      
      
      

      
      
      
      

      
      
      
      
      
      
      
      
      
      
      
   

   
   
   
      
      
      
      
      
      
   

   
   
      
      
      
      
   
   
   
      
   
复制

jdbc.properties

config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
复制

log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

mybatis.xml





    
    
        
        
        
        
        
        
        
        
        
        
        
        
    

    
    
        
    

复制

tubemq.properties

tubemq.url=localhost:8000
tubemq.topic_user=tubemq_spring_user
tubemq.group=tubemq_hong_consumer
复制

tubemq.xml





   
   
      
      
      
   
复制

WEB-INF/web.xml



    
        spring_tubemq_consumer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:tubemq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_tubemq_consumer
        /
    
复制

spring_mq/spring_tubemq_consumer/pom.xml



    
        spring_mq
        com.hong
        1.0-SNAPSHOT
    
    4.0.0

    com.hong.tubemq
    spring_tubemq_consumer

    
        0.8.0-incubating
    

    
        
            spring_mq_common_api
            1.0-SNAPSHOT
            com.hong
        


        
            org.apache.tubemq
            tubemq-client
            ${tubemq-client-version}
        

        
            com.alibaba
            fastjson
            1.2.71
        
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    6
                    6
                
            
        
    
复制

结果

数据库结果

简单做了下整合,用是可以用,可以在这个基础上再继续完善相关的功能。

springboot 整合TubeMq

因为官网也没有相关的支持,做了也没有啥太大的意义,等后续有支持再统一整合....(感觉没跟上时代...)

最后

相对来说这个TubeMq没有之前tars那么恶心,相对简洁功能过得去,也文档比较简单,但是想了解下为啥,鹅产开源的项目总感觉要么很难整、要么感觉好久没维护,感觉遗弃的娃一样....,看着19年的相关宣传又很霸气,怎么总感觉有点虎头蛇尾,大家觉得呢?

关联标签: