spring整合各种中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)-ZeroMQ

小编:管理员 199阅读 2022.08.01

上文:spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)ActiveMQ

ZeroMQ是什么?

ØMQ (也拼写作ZeroMQ,0MQ或ZMQ)是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。--维基百科

官网:https://zeromq.org/

相关原理?

建议参考这个文章:https://blog.csdn.net/weixin_37779156/article/details/102821706

实现源码

基于java

项目结构

│  java_zeromq.iml
│  pom.xml
│
└─src
    ├─main
    │  ├─java
    │  │  └─com
    │  │      └─hong
    │  │              Client.java
    │  │              Server.java
    │  │
    │  └─resources
    └─test
        └─java
复制

源码实现

spring_mq/java_zeromq/pom.xml



    
        spring_mq
        com.hong
        1.0-SNAPSHOT
    
    4.0.0

    com.hong
    java_zeromq
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    7
                    7
                
            
        
    

    
        
            org.zeromq
            jeromq
            0.5.2
        


        
    

    
        
            sonatype-nexus-snapshots
            https://oss.sonatype.org/content/repositories/snapshots
            
                false
            
            
                true
            
        
    
复制

一对一模式:Request-Response

com.hong.Server

package com.hong;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
 *
 * 功能描述: 服务端
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/4/27 14:31
 */
public class Server
{
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            // Socket to talk to clients
            ZMQ.Socket socket = context.createSocket(SocketType.REP);
            socket.bind("tcp://*:5555");

            while (!Thread.currentThread().isInterrupted()) {
                // Block until a message is received
                byte[] reply = socket.recv(0);

                // Print the message
                System.out.println(
                    "Received: [" + new String(reply, ZMQ.CHARSET) + "]"
                );

                // Send a response
                String response = "Hello,i'm hong!";
                socket.send(response.getBytes(ZMQ.CHARSET), 0);
            }
        }
    }
}
复制

com.hong.Client

package com.hong;

import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 14:30
 * @Description:客户端
 */
public class Client {
    public static void main(String[] args) {
        //创立客户端的上下文捷套接字
        ZMQ.Context context = ZMQ.context(1);
        System.out.println("Connecting to hello world server…");
        ZMQ.Socket requester = context.socket(ZMQ.REQ);

        //讲客户端绑定在5555端口
        requester.connect("tcp://localhost:5555");
        for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
            String request = "hong";
            System.out.println("Sending hong " + requestNbr);
            requester.send(request.getBytes(), 0);
            byte[] reply = requester.recv(0);
            System.out.println("Received " + new String(reply) + " " + requestNbr);
        }
        //关闭客户端的上下文套接字
        requester.close();
        context.term();
    }
}
复制

结果

client发起请求,然后等待server应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。

发布订阅:Publisher/Subscriber model

com.hong.PubServer

package com.hong;

import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 15:12
 * @Description:发布订阅-服务端
 */
public class PubServer {
    public static void main(String[] args) throws InterruptedException {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket publisher = context.socket(ZMQ.PUB);
        publisher.bind("tcp://*:5555");
        Thread.sleep(3000);
        for(int i=0;i<100;i++){
            publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK);
            System.out.println("pub msg " + i);
            Thread.sleep(1000);
        }
        context.close();
        publisher.close();
    }
}
复制

com.hong.SubClient

package com.hong;

import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 15:13
 * @Description:发布订阅-客户端
 */
public class SubClient {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
        subscriber.connect("tcp://localhost:5555");
        subscriber.subscribe("".getBytes());
        for (int i=0;i<100;i++) {
            //Receive a message.
            String string = new String(subscriber.recv(0));
            System.out.println("recv 1" + string);
        }
        //关闭套接字和上下文
        subscriber.close();
        context.term();
    }
}
复制

结果:先启用服务端,再启动两个客户端,发现,这个发布出去后,如果客户端没有接收到,丢失的消息就丢失了....

推拉模式:push/pull

package com.hong;

import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 15:16
 * @Description:push-服务端
 */
public class PushServer {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket push = context.socket(ZMQ.PUSH);
        push.bind("ipc://hong");
        for (int i = 0; i < 10000000; i++) {
            push.send("hong".getBytes(), i);
        }
        push.close();
        context.term();
    }
}
复制
package com.hong;

import org.zeromq.ZMQ;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author: csh
 * @Date: 2021/4/27 15:16
 * @Description:
 */
public class PullClient {
    public static void main(String[] args) {
        final AtomicInteger number = new AtomicInteger(0);
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable(){
                private int here = 0;
                public void run() {
                    // TODO Auto-generated method stub
                    ZMQ.Context context = ZMQ.context(1);
                    ZMQ.Socket pull = context.socket(ZMQ.PULL);
                    pull.connect("ipc://hong");
                    //pull.connect("ipc://fjs");
                    while (true) {
                        String message = new String(pull.recv());
                        int now = number.incrementAndGet();
                        here++;
                        if (now % 1000000 == 0) {
                            System.out.println(now + " here is : " + here);
                        }
                    }
                }
            }).start();
        }
    }
}
复制

结果:只能有一个pull消息,并且不管消息早发还是晚发都不会丢失,类似topic这种。

基于springmvc

spring_zeromq_producer 生产者

项目结构

│  pom.xml
│  spring_zeromq_producer.iml
│
├─src
│  ├─main
│  │  ├─java
│  │  │  └─com
│  │  │      └─hong
│  │  │          └─spring
│  │  │              ├─config
│  │  │              │      ZeroMqProducer.java
│  │  │              │
│  │  │              └─controller
│  │  │                  │  UserController.java
│  │  │                  │
│  │  │                  └─ao
│  │  │                          UserSaveAO.java
│  │  │
│  │  └─resources
│  │          application.properties
│  │          applicationContext.xml
│  │          log4j2.xml
│  │          logging.properties
│  │          zeromq.properties
│  │          zeromq.xml
│  │
│  └─test
│      └─java
└─web
    └─WEB-INF
            web.xml
复制

web.xml



    
        spring_zeromq_producer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:zeromq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_zeromq_producer
        /
    
复制

pom.xml



    
        spring_mq
        com.hong
        1.0-SNAPSHOT
    
    4.0.0

    com.hong
    spring_zeromq_producer

    
        
            spring_mq_common_api
            1.0-SNAPSHOT
            com.hong
        

        
            org.zeromq
            jeromq
            0.5.2
        

    


    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
            
        
    
复制

com.hong.spring.config.ZeroMqProducer

package com.hong.spring.config;

import org.zeromq.SocketType;
import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:zero配置
 */
public class ZeroMqProducer {

    private ZMQ.Socket push;
    private Integer threads;
    private SocketType zmqType;
    private String addr;

    public ZeroMqProducer(Integer threads, SocketType zmqType, String addr) {
        this.threads = threads;
        this.zmqType = zmqType;
        this.addr = addr;
    }

    public ZMQ.Socket init(){
        ZMQ.Context context = ZMQ.context(threads);
        push = context.socket(zmqType);
        push.bind(addr);
        return  push;
    }

    /**
     *
     * 功能描述: 发送消息
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/4/28 17:05
     */
    public Boolean send(String msg){
        return push.send(msg,ZMQ.NOBLOCK);
    }
}
复制

com.hong.spring.controller.ao.UserSaveAO

package com.hong.spring.controller.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}
com.hong.spring.controller.UserController
package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.ZeroMqProducer;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private ZeroMqProducer push;


    @RequestMapping("save")
    public DataResponse save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           Boolean send = push.send(JSONObject.toJSONString(ao));
           if(null==send || !send){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}
复制

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml





   
   
   
   

   
   

   


   
      
      
      
      
      
   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   
复制

log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

zeromq.properties

zeromq.url=ipc://springMvcHong
zeromq.threads=1
复制

zeromq.xml







   
   
      
      
      
   
复制

tomcat配置

结果:

请求地址:http://localhost:8588/user/save?username=spring_zeromq&age=1001

spring_zeromq_consumer

spring_mq/spring_zeromq_consumer/pom.xml



    
        spring_mq
        com.hong
        1.0-SNAPSHOT
    
    4.0.0

    com.hong
    spring_zeromq_consumer

    
        
            spring_mq_common_api
            1.0-SNAPSHOT
            com.hong
        

        
            org.zeromq
            jeromq
            0.5.2
        
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    6
                    6
                
            
        
    

复制

com.hong.spring.config.ZeroMqConsumer

package com.hong.spring.config;

import com.hong.spring.listener.UserListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.StringUtils;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:zero配置
 */
public class ZeroMqConsumer {

    @Autowired
    private UserListener userListener;

    private ZMQ.Socket push;
    private Integer threads;
    private SocketType zmqType;
    private String addr;

    public ZeroMqConsumer(Integer threads, SocketType zmqType, String addr) {
        this.threads = threads;
        this.zmqType = zmqType;
        this.addr = addr;
    }

    public void init(){
        ZMQ.Context context = ZMQ.context(threads);
        push = context.socket(zmqType);
        push.connect(addr);
        recvStr();
    }

    @Async
    public void recvStr(){
        while (true){
            String str = push.recvStr();
            if(!StringUtils.isEmpty(str)){
                userListener.Listener(str);
            }
        }
    }

}
复制

com.hong.spring.dao.UserMapper

package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List  list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List findByPage(UserAO ao);
}
复制

com.hong.spring.listener.UserListener

package com.hong.spring.listener;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
@Component
public class UserListener {

    @Autowired
    private IUserService userService;

    public void Listener(String str){
        log.info("获取的用户信息{}", str);
        User user = JSONObject.parseObject(str, User.class);
        DataResponse  save = userService.save(user);
        if(save==null || save.getData()==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }
}
复制

com/hong/spring/mapper/UserMapper.xml




    
        
        
        
    

    

    

    

    

    
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    

    
        insert into user
        ( user_name, age)
        values
        
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        
    

    
        update user
        
            
                user_name=#{username,jdbcType=VARCHAR},
            
            
                age =#{age,jdbcType=INTEGER}
            
        
        where id = #{id,jdbcType=INTEGER}
    
复制

com.hong.spring.provider.UserServiceImpl

package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse> findByAll() {
        List  allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse  save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse  insertBatch(List  list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse  update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse  findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse > findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List  byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}
复制

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml





   
   
   
   

   
   
   
   
   
   


   
      
      
      
      
      

   

   
   
   
      
         
         
            
               
                  text/plain;charset=UTF-8
                  text/html;charset=UTF-8
               
            
         
         
      
   


   
      
      
      
      
      

      
      
      
      
      
      
      
      
      

      
      
      
      

      
      
      
      
      
      
      
      
      
      
      
   

   
   
   
      
      
      
      
      
      
   

   
   
      
      
      
      
   
   
   
      
   
复制

jdbc.properties

config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
复制

log4j2.xml



    
        
            
        
        
            
            
        
    
    
        
            
            
        
    
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

zeromq.xml





   
   
      
      
      
   


mybatis.xml




    
    
        
        
        
        
        
        
        
        
        
        
        
        
    

    
    
        
    


zeromq.properties
zeromq.url=ipc://springMvcHong
zeromq.threads=1
复制

WEB-INF/web.xml



    
        spring_zeromq_consumer
        org.springframework.web.servlet.DispatcherServlet
        
            contextConfigLocation
            classpath:applicationContext.xml,
                classpath:zeromq.xml
            
        
        1
    

    
        encodingFilter
        org.springframework.web.filter.CharacterEncodingFilter
        
            encoding
            UTF-8
        
        
            forceEncoding
            true
        
    
    
        encodingFilter
        /*
    

    
        spring_zeromq_consumer
        /
    
复制

tomcat配置

结果:实现比较简单,唯一不足就是没有支持事务需要自己手动来实现,并且重发也没支持,也需要自己来实现....

19:00:50.902 [RMI TCP Connection(3)-127.0.0.1] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1001,"username":"spring_zeromq"}
19:00:53.566 [RMI TCP Connection(3)-127.0.0.1] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
19:00:53.615 [RMI TCP Connection(3)-127.0.0.1] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
19:00:53.630 [RMI TCP Connection(3)-127.0.0.1] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@5eb0675]
19:00:53.647 [RMI TCP Connection(3)-127.0.0.1] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@2a63a942] will be managed by Spring
19:00:53.652 [RMI TCP Connection(3)-127.0.0.1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?) 
19:00:53.688 [RMI TCP Connection(3)-127.0.0.1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: spring_zeromq(String), 1001(Integer)
19:00:54.319 [RMI TCP Connection(3)-127.0.0.1] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
19:00:54.328 [RMI TCP Connection(3)-127.0.0.1] DEBUG com.alibaba.druid.pool.PreparedStatementPool - stmt enter cache
复制

基于springboot

由于找了关于spring官网和相关的实际案例,很多demo都很有问题,并且方案是乱来的....再加相关资料比较少所以这个springboot就等日后spring有出正式的demo再继续完善。下面这个是跟springmvc类似的写法仅供参考。

https://docs.spring.io/spring-integration/docs/current/reference/html/zeromq.html#zeromq

springboot_zeromq_api 端口:8586 dubbo:20880

springboot_all/springboot_zeromq_api/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_zeromq_api


    
        1.8
    

    
        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

    

    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

com.hong.springboot.controller.IndexController

package com.hong.springboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: csh
 * @Date: 2021/1/12 10:16
 * @Description:首页
 */
@RestController
public class IndexController {
    @RequestMapping("/")
    public String index(){
        return "成功!";
    }
}
复制

com.hong.springboot.controller.UserController

package com.hong.springboot.controller;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:用户
 */
@RestController
@Slf4j
@RequestMapping("/user")
public class UserController {
    @Reference
    private IUserService userService;

    @GetMapping("/findByAll")
    public DataResponse> findByAll(){
        try {
            return userService.findByAll();
        } catch (Exception e){
            log.error("查询出错{}",e);
        }
        return DataResponse.BuildFailResponse("查询出错!");
    }

    @PostMapping("/save")
    public DataResponse save(User ao){
        if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        DataResponse  save = userService.save(ao);
        return save;
    }
}
复制

com.hong.springboot.Application

package com.hong.springboot;


import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}
复制

application.properties

#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller

#避免端口冲突
server.port=8586
复制

springboot_zeromq_producer 端口:8587 dubbo:20881

springboot_all/springboot_zeromq_producer/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_zeromq_producer


    
        1.8
    

    


        
            org.springframework.integration
            spring-integration-zeromq
            5.4.6
        


        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.1.3
        

        
            com.alibaba
            druid-spring-boot-starter
            1.1.10
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            junit
            junit
            4.12
            test
        
    

    
        
            
                
                
                
                    
                
            
        
    
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

com.hong.springboot.config.DruidConfig

package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}
复制

com.hong.springboot.config.ZeroMQProducer

package com.hong.springboot.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

import javax.annotation.PostConstruct;

@Configuration
@Service
public class ZeroMQProducer {


    private ZMQ.Context context;

    private ZMQ.Socket socket;

    private SocketType mode = SocketType.PUSH;

    @Value("${spring.zeromq.publisher.ioNum}")
    private Integer ioNum = 1;
    @Value("${spring.zeromq.subscriber.address}")
    private String address;

    public Integer getIoNum() {
        return ioNum;
    }

    public void setIoNum(Integer ioNum) {
        this.ioNum = ioNum;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @PostConstruct
    public void init(){
        context = ZMQ.context(ioNum);
        socket = context.socket(mode);
        socket.bind(address);
    }

    public boolean send(String msg) {
        return socket.send(msg, ZMQ.NOBLOCK);
    }
}
复制

com.hong.springboot.dao.UserMapper

package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List findAllUser();

    @Insert("insert into user (user_name,age) values(#{username},#{age})")
    int insert(User user);
}
复制

com.hong.springboot.provider.UserServiceImpl

package com.hong.springboot.provider;


import com.alibaba.druid.util.StringUtils;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.ZeroMQProducer;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;
    @Autowired
    private ZeroMQProducer zeroMQProducer;


    @Override
    public DataResponse> findByAll() {
        List  allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }
    //随机用
    private static int i = 0;

    @Override
    public DataResponse  save(User userAO) {
        i++;
        log.info("需要activemq添加的用户信息{}",JSONObject.toJSONString(userAO));
        try {
            //当==0时则用队列发送,当不等于的时候用topic发送。
            zeroMQProducer.send(JSONObject.toJSONString(userAO));
        }catch (Exception e){
            e.printStackTrace();
            return DataResponse.BuildSuccessResponse(false);
        }
        return DataResponse.BuildSuccessResponse(true);
    }

    @Transactional
    @Override
    public DataResponse  reallySave(User user) {
        log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
        if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int insert = userDao.insert(user);
        return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
    }
}
复制

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}
复制

META-INF/spring.factories

org.springboot.boot.autoconfigure.EnableAutoConfiguration=com.mydomain.logging.autoconfigure.LoggingAutoConfiguration
application.properties
复制

application.properties

rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rabbitmq_producer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo

#避免端口冲突
server.port=8587
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456


#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

#zero配置
spring.zeromq.publisher.ioNum=4
spring.zeromq.publisher.address=tcp://*:5555
spring.zeromq.publisher.topic=springboot_hong
spring.zeromq.subscriber.address=ipc://springboot_hong
复制

springboot_zeromq_consumer 端口:8588 dubbo:20882

springboot_all/springboot_zeromq_consumer/pom.xml



    
        com.hong.springboot
        springboot_all
        0.0.1-SNAPSHOT
        
    
    4.0.0

    com.hong.springboot
    springboot_zeromq_consumer


    
        1.8
    

    

        
            org.springframework.integration
            spring-integration-zeromq
            5.4.6
        

        
            com.hong.springboot
            springboot_mq_api
            1.0.0-SNAPSHOT
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            com.alibaba.boot
            dubbo-spring-boot-starter
            0.2.0
        
        
        org.apache.zookeeper
        zookeeper
        3.5.4-beta
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                
                
                
                    
                
            
        
    
    
    
        
            
                src/main/java
                
                    **/*.properties
                    **/*.xml
                
                false
            
            
                src/main/resources
                
                    **/*.properties
                    **/*.xml
                
                false
            
        
    

复制

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}
复制

com.hong.springboot.config.ZeroMQConsumer

package com.hong.springboot.config;

import com.alibaba.dubbo.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

import javax.annotation.PostConstruct;

@Configuration
@Service
public class ZeroMQConsumer {

    private String topic;

    private ZMQ.Context context;

    private ZMQ.Socket socket;

    private SocketType mode = SocketType.PULL;

    @Autowired
    private MessageCallback callback;


    @Value("${spring.zeromq.publisher.ioNum}")
    private Integer ioNum = 1;
    @Value("${spring.zeromq.subscriber.address}")
    private String address;

    public Integer getIoNum() {
        return ioNum;
    }

    public void setIoNum(Integer ioNum) {
        this.ioNum = ioNum;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @PostConstruct
    public void init(){
        context = ZMQ.context(ioNum);
        socket = context.socket(mode);
        socket.connect(address);
        receve();
    }
    @Async
    public boolean receve() {
        while (true){
            String str = socket.recvStr();
            if(!StringUtils.isEmpty(str)){
                System.out.println("消息:"+str);
                callback.onMessage(str);
            }
        }
    }
}
复制

com.hong.springboot.config.MessageCallback

package com.hong.springboot.config;

import java.io.Serializable;

public interface MessageCallback {
    void onMessage(T msg);
}
复制

com.hong.springboot.listener.UserListener

package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.MessageCallback;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@Log4j2
public class UserListener implements MessageCallback, Serializable {

    @Reference
    private IUserService userService;



    @Override
    public void onMessage(String msg) {
        if(null==msg){
            return;
        }
        try {
            User user = JSONObject.parseObject(msg,User.class);
            log.info("最终要添加的值{}",JSONObject.toJSONString(user));
            DataResponse save = userService.reallySave(user);
            if(save==null || !save.getData()){
                log.info("添加失败,原因{}",JSONObject.toJSONString(save));
            }
        }catch (Exception e){
            log.error("添加出错",e);
        }
    }
}
复制

application.properties

rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#避免端口冲突
server.port=8588

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener
spring.zeromq.publisher.ioNum=4
spring.zeromq.publisher.address=tcp://*:5555
spring.zeromq.publisher.topic=springboot_hong
spring.zeromq.subscriber.address=ipc://springboot_hong
复制

结果:

请求:http://localhost:8586/user/save?username=springboot_zeromq&age=1001

最后

zeromq在国内java方向用得是相当少,并且网上的资料也挺少的,看了spring官网也是最近刚刚支持相关的api,所以这个框架还真的不够成熟,上面的springboot也不是注释方式,用是可以用,但是不是最终版,后续等官网给的文档全了再继续完善吧。

关联标签: