RocketMQ学习九-Broker初步分析

瘢痂派生
• 阅读 1317

一,Broker缓存的数据

Broker主要缓存了路由信息,包含producer表,consumer表,consumerGroup表和topic表。这些信息是在ProducerManager,ConsumerManager,SubscriptionGroupManager,TopicConfigManager这几个类里进行管理的。

ProducerManager
//producer列表
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable
  • groupChannelTable:各ProducerGroup中分别有哪些存活的Producer连接;每个连接的Producer最后一次发来心跳的时间
ConsumerManager
//consumer列表
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
  • consumerTable:每个ConsumerGroup中分别有哪些存活的Consumer连接,分别订阅了哪些Topic,订阅的每个Topic使用什么过滤条件(TAG)。
SubscriptionGroupManager
//ConsumerGroup表
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
  • subscriptionGroupTable:各ConsumerGroup的消费行为特点,例如:消费失败后的最大重试次数;重试队列个数;如果从MasterBroker消费缓慢,切换到哪个Slave Broker进行消费
TopicConfigManager
//topic列表
ConcurrentMap<String, TopicConfig> topicConfigTable
  • topicConfigTable:分布在当前Broker上的各Topic分片的配置信息,如:包含的读/写Queue的数量;是否有读/写权限

二,Broker启动设计

  1. 创建BrokerController
    创建BrokerController类是在BrokerStartup#createBrokerController方法里进行的。先是进行参考解析,完了创建BrokerController类,紧接着调用其initialize方法,里面的逻辑主要有:
    1)加载topic,consumer消费进度,订阅关系与consumer过滤的配置,并会加载消息的日志文件
    2)再创建一个netty服务监听10909这个VIP端口
    3)初始化一系列线程池,然后在registerProcessor方法里将这些线程池与处理器进行关联,为以后不同的业务使用不同的线程池,也就是线程隔离
    4)启动一些定时任务,比如记录Broker状态,消费进度持久化等
    5)最后进行权限校验初始化和Rpc调用钩子相关服务,这些服务加载方式是Java的SPI方式进行的。
  2. 启动Broker

     public void start() throws Exception {
         //启动消息存储相关的任务
         if (this.messageStore != null) {
             this.messageStore.start();
         }
         //启动broker服务器
         if (this.remotingServer != null) {
             this.remotingServer.start();
         }
         //启动给消息发送者使用的netty服务
         if (this.fastRemotingServer != null) {
             this.fastRemotingServer.start();
         }
         //启动监控SSL连接文件的服务
         if (this.fileWatchService != null) {
             this.fileWatchService.start();
         }
         //启动外部API的客户端
         if (this.brokerOuterAPI != null) {
             this.brokerOuterAPI.start();
         }
         //启动pull模式相关的服务
         if (this.pullRequestHoldService != null) {
             this.pullRequestHoldService.start();
         }
         //启动心跳检测服务
         if (this.clientHousekeepingService != null) {
             this.clientHousekeepingService.start();
         }
         //启动消息过滤服务
         if (this.filterServerManager != null) {
             this.filterServerManager.start();
         }
         //如果没启动DLegerCommitLog ,就将Broker注册到NameServer上
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
             startProcessorByHa(messageStoreConfig.getBrokerRole());
             handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
         }
    
         /*向namesrv注册*/
         this.registerBrokerAll(true, false, true);
    
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
             @Override
             public void run() {
                 try {
                     BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {
                     log.error("registerBrokerAll Exception", e);
                 }
             }
         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
         if (this.brokerStatsManager != null) {
             this.brokerStatsManager.start();
         }
    
         if (this.brokerFastFailure != null) {
             this.brokerFastFailure.start();
         }
    
    
     }
  • messageStore服务:处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等
  • remotingServer服务:处理客户端producer&consumer的请求
  • fastRemotingServer服务:默认端口可能存在多用,可能会造成业务阻塞。新开一个VIP端口专门进行消息处理。不过4.5版本之后默认已关闭,是为了妆容之前版本。
  • fileWatchService服务:启动监控服务连接时用到的SSL连接文件的服务
  • brokerOuterAPI服务:RocketMQ控制台跟Broker交互时候的客户端
  • pullRequestHoldService服务:处理push模式消费,或者延迟消费的服务
  • clientHousekeepingService服务:心跳连接用的服务
  • filterServerManager服务:过滤消息服务
  • transactionalMessageCheckService服务:定期检查和处理事务消息服务
  • slaveSynchronize服务:主从路由信息同步服务
  1. netty服务端的启动
    这里可以参考之前文章第三大点4小点里的服务端的创建

参考文章:
Broker部分之Broker启动过程BrokerStartup(2)

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
双十一预售活动分析
2022年双十一促销活动已经开始,大家应该都提前开始关注今年双十一活动的时间表了吧?2022年10月24日晚8:00天猫双11预售时间,第一波销售时间10月31日晚8:0,第二波销售时间11月10日晚8:00;天猫双11的优惠力度是跨店每满30050
Easter79 Easter79
3年前
sql注入
反引号是个比较特别的字符,下面记录下怎么利用0x00SQL注入反引号可利用在分隔符及注释作用,不过使用范围只于表名、数据库名、字段名、起别名这些场景,下面具体说下1)表名payload:select\from\users\whereuser\_id1limit0,1;!(https://o
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
mysql select将多个字段横向合拼到一个字段
表模式:CREATE TABLE tbl_user (  id int(11) NOT NULL AUTO_INCREMENT,  name varchar(255) DEFAULT NULL,  age int(11) DEFAULT NULL,  PRIMARY KEY (id)
Wesley13 Wesley13
3年前
MySQL 实战
项目七:各部门工资最高的员工(难度:中等)创建Employee 表,包含所有员工信息,每个员工有其对应的 Id,salary和departmentId。|Id|Name|Salary|DepartmentId|
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究