RocketMQ客户端是如何感知Broker节点的?

本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!

  • ? 魔都架构师 | 全网30W技术追随者
  • ? 大厂分布式系统/数据中台实战专家
  • ? 主导交易系统百万级流量调优 & 车联网平台架构
  • ? AIGC应用开发先行者 | 区块链落地实践者
  • ? 以技术驱动创新,我们的征途是改变世界!
  • ? 实战干货:编程严选网

0 前言

RocketMQ的Pro只需配置一个接入地址,即可访问整个集群,而无需client配置每个Broker的地址。
即RocketMQ会自动根据要访问的topic名称、queue序号,找到对应Broker地址。

Q:咋实现的?

A:由NameServer协调Broker和client共同实现,可伸缩的分布式集群,都需类似服务。

1 NamingService作用

为client提供路由信息,帮助client找到对应的Broker,NamingService本身也是集群。分布式集群架构的通用设计。

负责维护集群内所有节点的路由信息:client需访问的某特定服务在啥节点。

集群的节点会主动连接NamingService服务,注册自身路由信息。给client提供路由寻址服务的方式:

  1. client直连NamingService,查询路由信息
  2. client连接集群内任一节点查询路由信息,节点再从自身缓存或查询NamingService

2 NameServer咋提供服务?

NameServer独立进程,为Broker、Pro、Con提供服务。支持单点、多节点集群部署。

NameServer最主要为client提供寻址服务,协助 client 找到topic对应的Broker地址。也负责监控每个Broker的存活状态。

每个NameServer节点都保存了集群所有Broker的路由信息,因此可独立提供服务。

每个Broker都需和【所有NameServer节点】通信。当 Broker 保存的 Topic 信息变化时,要主动通知所有 NameServer 更新路由信息。

为保证数据一致性,Broker会与所有NameServer节点建立长连接,定期上报Broker的路由信息。该上报路由信息的RPC请求,同时还起到 Broker 与 NameServer 间心跳作用,NameServer 依靠该心跳确定Broker健康状态。

client会选择连接某一NameServer节点,定期获取订阅topic的路由信息,用于Broker寻址。

每个 NameServer 节点都可独立提供服务,因此对于client(Pro和Con),只需选择任一 NameServer 节点来查询路由信息即可。

client在生产或消费某topic的消息前:

  • 先从NameServer查询这topic的路由信息
  • 然后根据路由信息获取到当前topic和队列对应的Broker物理地址
  • 再连接到Broker节点上生产或消费

如果NameServer检测到与Broker的连接中断,NameServer会认为这个Broker不能再提供服务。会立即把这Broker从路由信息中移除,避免client连接到一个不可用Broker。

client在与Broker通信失败后,会重新去NameServer拉取路由信息,然后连接到其他Broker继续生产或消费消息,实现自动切换失效Broker。

3 组成

UML

  • NamesrvStartup:程序入口
  • NamesrvController:NameServer的总控制器,负责所有服务的生命周期管理
  • BrokerHousekeepingService:监控Broker连接状态的代理类
  • DefaultRequestProcessor:负责处理 client、Broker 发送过来的RPC请求的处理器
  • ClusterTestRequestProcessor:用于测试的请求处理器
  • RouteInfoManager:NameServer最核心的实现类,负责维护所有集群路由信息,这些路由信息都保存在内存,无持久化

RouteInfoManager

如下5个Map保存了集群所有的Broker和topic的路由信息

public class RouteInfoManager {
 /**
 * 120s,broker 上一次心跳时间超过该值便会被剔除
 */
 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
 private final ReadWriteLock lock = new ReentrantReadWriteLock();
 /**
 * 维护topic和queue信息
 * topic消息队列的路由信息,消息发送时会根据路由表进行负载均衡
 * K=topic名称
 * V=Map:
 * K=brokerName
 * V=队列数据,如读/写队列的数量、权重
 */
 private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
 /**
 * 维护集群中每个brokerName对应的Broker信息
 * broker的基础信息
 * K=brokerName,V=brokerName
 * broker所在的集群信息,主备broker的地址。
 */
 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
 /**
 * 维护clusterName与BrokerName的对应关系
 * broker集群信息
 * K=集群名称
 * V=集群中所有broker的名称
 */
 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
 /**
 * 维护每个Broker(brokerAddr)当前的动态信息,包括心跳更新时间,路由数据版本等
 * Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10s要扫描的信息。
 */
 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
 /**
 * 维护每个Broker (brokerAddr)对应的消息过滤服务的地址(Filter Server),用于服务端消息过滤
 * Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃
 */
 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Broker信息

public class BrokerData implements Comparable<BrokerData> {
 // 集群名称
 private String cluster;
 private String brokerName;
 // 保存Broker物理地址 <brokerId,broker address>
 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

4 NameServer处理Broker注册的路由信息

NameServer处理Broker和client所有RPC请求的入口方法:

4.1 DefaultRequestProcessor

4.1.1 processRequest

处理Broker注册请求

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
 @Override
 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
	// 典型的Request请求分发器,根据request.getCode()来分发请求到对应处理器
 switch (request.getCode()) {
 ...
 }
 return null;
 }

4.1.2 REGISTER_BROKER

Broker发给NameServer注册请求的Code为REGISTER_BROKER,根据Broker版本号不同,分别有两个不同处理方法

case RequestCode.REGISTER_BROKER:
 Version brokerVersion = MQVersion.value2Version(request.getVersion());
 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
 return this.registerBrokerWithFilterServer(ctx, request);
 } else {
 return this.registerBroker(ctx, request);
 }
① registerBroker
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) {
 	// registerBroker
 RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
 requestHeader.getClusterName(),
}
② registerBrokerWithFilterServer
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) {
 ...
 RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
 requestHeader.getClusterName(),

两个方法流程差不多,都是调用

4.2 RouteInfoManager

registerBroker

注册Broker路由信息。

public RegisterBrokerResult registerBroker(
 final String clusterName,
 final String brokerAddr,
 final String brokerName,
 final long brokerId,
 final String haServerAddr,
 final TopicConfigSerializeWrapper topicConfigWrapper,
 final List<String> filterServerList,
 final Channel channel) {
 RegisterBrokerResult result = new RegisterBrokerResult();
 try {
 try {
 // 加写锁,防止并发修改数据
 this.lock.writeLock().lockInterruptibly();
 // 更新clusterAddrTable
 Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
 if (null == brokerNames) {
 brokerNames = new HashSet<>();
 this.clusterAddrTable.put(clusterName, brokerNames);
 }
 brokerNames.add(brokerName);
 // 更新brokerAddrTable
 boolean registerFirst = false;
 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
 if (null == brokerData) {
 registerFirst = true; // 标识需要先注册
 brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
 this.brokerAddrTable.put(brokerName, brokerData);
 }
 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
 //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
 //The same IP:PORT must only have one record in brokerAddrTable
 // 更新brokerAddrTable中的brokerData
 Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
 while (it.hasNext()) {
 Entry<Long, String> item = it.next();
 if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
 it.remove();
 }
 }
 // 如果是新注册的Master Broker,或者Broker中的路由信息变了,需要更新topicQueueTable
 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
 registerFirst = registerFirst || (null == oldAddr);
 if (null != topicConfigWrapper
 && MixAll.MASTER_ID == brokerId) {
 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
 || registerFirst) {
 ConcurrentMap<String, TopicConfig> tcTable =
 topicConfigWrapper.getTopicConfigTable();
 if (tcTable != null) {
 for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
 this.createAndUpdateQueueData(brokerName, entry.getValue());
 }
 }
 }
 }
 // 更新brokerLiveTable
 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
 new BrokerLiveInfo(
 System.currentTimeMillis(),
 topicConfigWrapper.getDataVersion(),
 channel,
 haServerAddr));
 if (null == prevBrokerLiveInfo) {
 log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
 }
 // 更新filterServerTable
 if (filterServerList != null) {
 if (filterServerList.isEmpty()) {
 this.filterServerTable.remove(brokerAddr);
 } else {
 this.filterServerTable.put(brokerAddr, filterServerList);
 }
 }
 // 若是Slave Broker,需要在返回的信息中带上master的相关信息
 if (MixAll.MASTER_ID != brokerId) {
 String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
 if (masterAddr != null) {
 BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
 if (brokerLiveInfo != null) {
 result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
 result.setMasterAddr(masterAddr);
 }
 }
 }
 } finally {
 // 释放写锁
 this.lock.writeLock().unlock();
 }
 } catch (Exception e) {
 log.error("registerBroker Exception", e);
 }
 return result;
}

5 client定位Broker

对于client,无论Pro、Con,通过topic寻找Broker的流程一致,都是同一实现。
client启动后,会启动定时器,定期从NameServer拉取相关topic的路由信息,缓存在本地内存,需要时使用。

每个topic的路由信息由TopicRouteData对象表示:

public class TopicRouteData extends RemotingSerializable {
 private String orderTopicConf;
 private List<QueueData> queueDatas;
 private List<BrokerData> brokerDatas;
 private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

client选定队列后,可在对应QueueData找到对应BrokerName,然后找到对应BrokerData对象,最终找到对应Master Broker的地址。

6 根据topic,查询Broker的路由信息

RouteInfoManager#pickupTopicRouteData

NameServer处理client请求和处理Broker请求流程一致,都是通过路由分发器将请求分发的对应的处理方法中:

 public TopicRouteData pickupTopicRouteData(final String topic) {
 // 1.初始化返回值 topicRouteData
 TopicRouteData topicRouteData = new TopicRouteData();
 boolean foundQueueData = false;
 boolean foundBrokerData = false;
 Set<String> brokerNameSet = new HashSet<>();
 List<BrokerData> brokerDataList = new LinkedList<>();
 topicRouteData.setBrokerDatas(brokerDataList);
 HashMap<String, List<String>> filterServerMap = new HashMap<>();
 topicRouteData.setFilterServerTable(filterServerMap);
 try {
 try {
 // 2.加读锁
 this.lock.readLock().lockInterruptibly();
 // 3.先获取topic对应的queue信息
 List<QueueData> queueDataList = this.topicQueueTable.get(topic);
 if (queueDataList != null) {
 // 4.将queue信息存入返回值
 topicRouteData.setQueueDatas(queueDataList);
 foundQueueData = true;
 // 5.遍历队列,找出相关的所有BrokerName
 for (QueueData qd : queueDataList) {
 brokerNameSet.add(qd.getBrokerName());
 }
 // 6.遍历BrokerName,找到对应BrokerData,并写入返回结果中
 for (String brokerName : brokerNameSet) {
 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
 if (null != brokerData) {
 BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
 .getBrokerAddrs().clone());
 brokerDataList.add(brokerDataClone);
 foundBrokerData = true;
 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
 List<String> filterServerList = this.filterServerTable.get(brokerAddr);
 filterServerMap.put(brokerAddr, filterServerList);
 }
 }
 }
 }
 } finally {
 // 7.释放读锁
 this.lock.readLock().unlock();
 }
 } catch (Exception e) {
 log.error("pickupTopicRouteData Exception", e);
 }
 log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
 if (foundBrokerData && foundQueueData) {
 	// 8.返回结果
 return topicRouteData;
 }
 return null;
 }

本文由博客一文多发平台 OpenWrite 发布!

作者:公众号-JavaEdge原文地址:https://www.cnblogs.com/JavaEdge/p/18885051

%s 个评论

要回复文章请先登录注册