RocketMQ源码之路(二)NameServer路由中心源码分析

小编:管理员 138阅读 2022.08.03

一般了解RocketMQ的读者都知道,NameServer是RocketMQ的组织协调者,是RocketMQ对外提供服务的“大脑”。NameServer提供了路由管理,服务注册与服务发现等机制,是保证消息正确地从生产者到消费者的“指挥官”。那么,生产者生产的消息是如何正确地被消费者消费的呢?Broker的宕机是如何被生产者和消费者感知的呢?RocketMQ对外提供服务的可靠性是如何保障的呢?带着这几个问题,我们一起去深入了解RocketMQ NameServer的设计原理及实现吧!

一、NameServer的基本原理

我们熟知的几种常见的消息队列组件,比如Kafka,ActiveMQ,RabbitMQ等,都是一种基于主题的发布订阅机制,RocketMQ也正是基于这种机制实现的消息服务。消息生产者(Producer)将生产好的消息发布到某个主题,该主题下的消息在消息服务器(Broker)中进行传送或存储,由消费者(Consumer)进行订阅主题,从消息服务器中获取到消息后进行消费。消费者获取消息的方式通常有两种,一种是主动去消息服务器拉取消息(Pull Message),另外一种是由消息服务器推送消息(Push Message)给消费者。这种主题的发布订阅机制应用到分布式系统中,成功解耦了生产者和消费者。既然是分布式系统,那么常常存在分布式系统问题,比如某个消息服务器宕机了,生产者是如何感知这台消息服务器宕机了,从而避免将消息发送到这台消息服务器上,消费者是如何感知这台消息服务器宕机了,从而避免从这台消息服务器上拉取消息的呢?且这台宕机的消息服务器是如何从消息服务器实例列表中被剔除的呢?这一切都将归功于NameServer,它的诞生让动态感知、动态剔除、负载均衡成为可能。 图1-1是RocketMQ常见的物理部署图(图片来源:百度图库),采用的部署方式2m-2s(2Master,2Slave),本小节将根据此图阐述RocketMQ基本的流程原理,后面的小节将深入源码中,从源码中来验证基本流程原理。

文章一开始就说道,NameServer是整个RocketMQ消息服务系统的“大脑”,是指挥消息正确发送、消费“指挥官”,那么他是如何完成这样完美的演出的呢? NameServer被设计为一种无状态的服务注册发现中心,在NameServer集群中,各个NameServer之间是无感知,无通信的独立节点,任何一个NameServer节点挂掉,都不影响整体的消息服务。Broker在启动的时候,会向指定的NameServer列表中的每个NameServer注册,发送自身的元信息到每个NameServer中,这些元信息包含但不限于BrokerName,BrokerAddress,Broker端口,集群信息,Topic等信息,这些元信息将保存在NameServer的路由信息管理器(RouteInfoManager)中。当消息生产者在将生产的消息发送出去之前,会从NameServer中拉取Broker的信息列表,然后通过负载均衡算法从中选择一个Broker服务器将消息发送出去。当消息消费者要消费消息之前,也会去NameServer中拉取Broker的信息列表,从而从Broker中获取可消费的消息。Broker在首次启动会向NameServer注册元信息,启动后也会定期向NameServer发送心跳,这个周期默认是30秒,当然这个周期可以自定义,支持范围是10秒到60秒之间,每次心跳发送的数据包都是该Broker的元数据信息。NameServer也有自动检测能力,NameServer启动后会注册一个定时任务线程池,每10秒会自动扫描Broker列表,对于不再存活的Broker,将做剔除处理。这动态维护路由信息的能力,并不包含动态通知消息生产者,也就是说生产者并不会及时感知到非存活状态Broker被剔除,但是这并不影响消息的正确发送,因为生产者自身提供有容错机制来保证消息的正常发送。消费者与NameServer没有保持长连接,而是每30秒从NameServer获取所有Topic的信息列表,如果某个时刻某个Broker宕机,消费者可能需要30秒才能知道这个宕机的Broker是哪一个,当然这个值也是可以手动配置的,可根据实际业务来配置该值。消费者在感知Broker存活这一块,有自己的机制,比如每30秒向Broker发送心跳,且Broker每10秒会检测与消费者的连接情况,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。

二、NameServer的启动流程原理

在《RocketMQ源码之路(一)搭建RocketMQ源码环境》中,我们了解了如何使用IDE启动NameServer,那么本小节将和大家一起探讨NameServer的启动流程原理,我们将 从NameServer的启动类NamesrvStartup开始,和大家一起来阅读NameServer在启动源码,帮助大家理解NameServer的启动流程。 NameServer的启动类NamesrvStartup的main方法如下所示:

public static void main(String[] args) {
        main0(args);
}

public static NamesrvController main0(String[] args) {

    try {
        // 第一步:根据命令行参数创建一个NamesrvController对象,内部包含各种参数加载设置等操作
        // 并设置了namesrv的启动端口
        NamesrvController controller = createNamesrvController(args);

        // 第二步:启动controller
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand
                .getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}
复制

从main0方法中可以看出,启动NameServer只有两个步骤,第一步是创建NamesrvController实例对象,第二步调用NamesrvStartup的start方法启动controller。我们一起来阅读NamesrvStartup的createNamesrvController方法,看看在创建NamesrvController对象的具体流程,代码如下:

/**
 * 创建一个Name Server Controller
 *
 * @param args 命令行参数
 * @return Name Server Controller对象
 * @throws IOException IO异常
 * @throws JoranException Joran异常
 */
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // 设置一个系统参数,key为rocketmq.remoting.version,当前版本值为:Version.V4_7_1,数值为355
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    // 构建-h 和 -n 的命令行参数option,并且自定义了一个P命令行参数,用于定义namesrv端口
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    // 解析完毕后的命令行参数
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        // 如果命令行参数为null,则退出虚拟机进程
        System.exit(-1);
        return null;
    }

    // 分别创建namesrv和nettyServer的config对象
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 设置netty监听9876端口,这就是为什么namesrv的默认端口是9876,这里可以改成其他端口
    // 其实还可以修改上述命令行参数代码,自定义一个参数,用来设置监听端口,在启动的时候指定该参数
    String listenPort;
    if (commandLine.hasOption('P') && (StringUtils.isNumeric(listenPort = commandLine.getOptionValue('P')))) {
        nettyServerConfig.setListenPort(Integer.parseInt(listenPort));
    } else {
        nettyServerConfig.setListenPort(9876);
    }
    // 加载Name server config properties file
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    // 如果在启动参数加上选项-p,那么将打印出namesrvConfig和nettyServerConfig的属性值信息
    // 其中namesrvConfig主要配置了namesrv的信息,nettyServerConfig主要配置了netty的属性值信息
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    // 填充命令行commandLine中参数到namesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    // rocketmq_home默认来源于配置rocketmq.home.dir,如果没有配置,将从环境变量中获取ROCKETMQ_HOME参数
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out
                .printf("Please set the %s variable in your environment to match the location of the RocketMQ "
                                + "installation%n",
                        MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 自定义日志配置logback_namesrv.xml,可以了解博文(https://www.jianshu.com/p/3b9cb5e22052)来理解日志的配置加载
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    // 根据namesrvConfig, nettyServerConfig来创建一个NamesrvController对象
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // 将属性集合properties保存到controller的configuration属性中
    controller.getConfiguration().registerConfig(properties);

    return controller;
}
复制
关联标签: