Java进阶-RocketMQ原理
高级功能部分
- 消息中心启动流程(源码分析略)
架构设计
- MQ主要分为4个角色:生产者、消费者、消息中心(broker)、NameServer
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:名称服务
- 作用:路由的发现以及注册
- 给Broker定时注册对应的路由信息(Topic对应信息,Broker的信息)
- 给消费者、生产者及时的去Namerserver获取最新的路由信息,会缓存在本地(每60s更新一次)
- 特点:
- NameServer之间是无状态,并且NameServer之间是不会去通信
- 有可能出现在某一个时间NameServer之间的数据不一致
- Broker:数据存储节点
- 一个集群中可以有多个Borker,通过Brokername进行区分 boker-a,boker-b
- 对于所有的boker-a,boker-b之间存储的消息是不一样的
- 对于不同的broker-a,broker-b一般节点会部署在不同的服务器
- 对于同一个broker,其中master负责读写消息,slave只负责读,如果brokerid=0代表的就是master,如果是>0就是slave,只有id=1 的slave可以负责读,其他的不能读
- 主要工作:
- 对于消息的存储以及转发
- 定时向NameServer发送心跳
- 定时同步数据到slave(同步、异步 ->同步数据)
- 启动流程
- 启动NameServer,启动broker,启动生产者,启动消费者
消息存储

- 生产者发送所有消息会存储在CommitLog文件中,顺序写入
- 消费者消费:消费者不是直接到CommitLog中读取消息,而是到Topic下的消息队列(consumerQueue)中读取消息
- CommitLog的数据会形成索引(commitoffset/msgsize/tagHashCode)存储到该消息对应的topic下的消费队列中去
- 把所有消息的索引信息根据Topic进行分类转发到不同的consumerQueue(消息队列)中去
- 即消息队列中存储的是commitoffset/msgsize/tagHashCode,每个消息用这3个字段确定在CommitLog位置(commitoffset/msgsize/tagHashCode)
- 对于消费者消费的消息,首先根据Topic,偏移量信息(broker)从consumerQueue中读取数据
页缓存与内存映射
- 页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
- 在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
- 另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
消息刷盘
- 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
- 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
负载均衡
- Producer负载均衡:
- Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。
- Consumer负载均衡
- 默认使用的是平均分配消费者队列
- 对于同一个消费者组, 必须使用相同的 负载均衡策略
- 在启动消费者的时候,对于集群模式下的同一个组(消费者组)的不同消费者会根据一定的策略绑定对应的队列,即消费者组中的每个消费节点都会对应一个队列,因此 消费者数量<=队列数量
集群环境搭建
- 该节讲了使用腾讯云的服务器来部署mq集群,如果后序使用,回看即可
- 集群规划
- NameServer3台:192.168.48.100: 9876/192.168.48.101: 9876/192.168.48.102: 9876
- Broker规划3组
- broker-a
- Master:192.168.48.100: 10911
- Slave:192.168.48.101: 10811
- Slave:192.168.48.102: 10711
- broker-b
- Master:192.168.48.101: 10911
- Slave:192.168.48.100: 10811
- Slave:192.168.48.102: 10711
- broker-c
- Master:192.168.48.102: 10911
- Slave:192.168.48.101: 10811
- Slave:192.168.48.100: 10711
- broker-a
- 每组broker下有一个主节点(写),2个从节点(读)
- 每一台主机(192.168.48.100)有一个主节点,2个从节点,注意: 这两个从节点是其他2个master的从节点,并不是当前broker下master的从节点
- 在腾讯云购买新建3台服务器实例
- 每台服务器配置4核16G 1M bps
-
新建完会生成3对服务器的地址(公网、内网)
42.194.183.141 外 172.16.16.13 内 106.55.28.87 外 172.16.16.6 内 106.55.16.63 外 172.16.16.5 内
给三台服务器分别安装JDK,配置JAVA_HOME
- 准备好jdk包:jdk-8u161-linux-x64.tar.gz、直接官网下载mq的二进制安装包:rocketmq-all-4.7.0-bin-release.zip
- 将上面2个包都拷贝到3台服务器的root/下
- 分别解压上面两个包到/usr/local/下:
tar -zxvf jdk-8u161-linux-x64.tar.gz -C /usr/local/ unzip rocketmq-all-4.7.0-bin-release.zip -d /usr/local/
-
分别给上面两个修改名称:
mv jdk1.8.0_161/ jdk1.8 mv rocketmq-all-4.7.0-bin-release/ rocketmq-4.7.0
-
配置环境变量JAVA_HOME
vi /etc/profile #最后行新增 export JAVA_HOME=/usr/local/jdk1.8 export PATH=$JAVA_HOME/bin:$PATH #以上保存修改,关闭,执行以下从新加载配置文件 source /etc/profile
rocketmq 的nameserver启动以及准备工作
-
调整3个服务器mq的运行内存占用
cd /usr/local/rocketmq-4.7.0/bin # 分别使用vim命令修改runbroker.sh和runserver.sh脚本 //runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" //runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
-
启动三台服务器的nameserver
# 在/usr/local/rocketmq-4.7.0目录下 nohup bin/mqnamesrv &
-
修改3台服务器的配置文件
# 在/usr/local/rocketmq-4.7.0目录下 cd conf # 创建目录3m-6s-async,代表3个主节点,6个从节点,异步数据 mkdir 3m-6s-async cd 3m-6s-async vi broker-a.properties
-
接下来就是要在3m-6s-async文件夹下新增配置文件了用来配置三主三从的broker
# 服务器 42.194.183.141 broker-a.properties broker-b-s.properties broker-c-s.properties # 服务器 106.55.28.87 broker-b.properties broker-a-s.properties broker-c-s.properties # 服务器 106.55.16.63 broker-c.properties broker-a-s.properties broker-b-s.properties
3台主机的master broker的配置文件
-
broker-a.properties
# 集群名字 brokerClusterName = DefaultCluster # broker的组名 brokerName = broker-a # 0 代表Master, 非0 代表Slave brokerId = 0 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色(同步、异步,通常用异步): ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = ASYNC_MASTER # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # 当前broker的ip地址,可以配置一个外网的,也可以配置一个内网的,也可以都配置一个 brokerIP1=42.194.183.141 brokerIP2=172.16.16.13 # NameServer的地址信息,三台主机之间通信,都使用内网地址 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 # 数据存储的地方 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-a/store/commitlog # 是否自动创建主题(topic), 建议关闭;即必须在MQ的管理端先创建好Topic,然后代码中使用这个创建好的topic才可以发送消息,不能直接使用代码任意写的Topic发消息 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10911 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-a # 参与投票选举的通信操作 dLegerPeers=n0-172.16.16.13:40911;n1-172.16.16.6:40911;n2-172.16.16.5:40911 # 角色id, 必须唯一 dLegerSelfId=n0
-
broker-b.properties
# 只写与broker-a不同的地方,其余略 # broker的组名 brokerName = broker-b # 当前broker的ip地址,可以配置一个外网的,也可以配置一个内网的,也可以都配置一个 brokerIP1=106.55.28.87 brokerIP2=172.16.16.6 # 数据存储的地方 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-b/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-b/store/commitlog # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-b # 参与投票选举的通信操作 dLegerPeers=n3-172.16.16.13:40811;n4-172.16.16.6:40811;n5-172.16.16.5:40811 # 角色id, 必须唯一 dLegerSelfId=n3
-
broker-c.properties
# 只写与broker-a不同的地方,其余略 # broker的组名 brokerName = broker-c # 当前broker的ip地址,可以配置一个外网的,也可以配置一个内网的,也可以都配置一个 brokerIP1=106.55.16.63 brokerIP2=172.16.16.5 # 数据存储的地方 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-c/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-c/store/commitlog # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-c # 参与投票选举的通信操作 dLegerPeers=n6-172.16.16.13:40711;n7-172.16.16.6:40711;n8-172.16.16.5:40711 # 角色id, 必须唯一 dLegerSelfId=n6
-
启动3个主机的master broker
nohup bin/mqbroker -c conf/3m-6s-async/broker-a.properties & nohup bin/mqbroker -c conf/3m-6s-async/broker-b.properties & nohup bin/mqbroker -c conf/3m-6s-async/broker-c.properties & # 使用jps查看是否启动成功
3台主机的slave broker的配置文件
-
42.194.183.141 主机
# broker-b-s.properties brokerClusterName = DefaultCluster brokerName = broker-b # 0 代表Master, 非0 代表Slave brokerId = 2 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=42.194.183.141 brokerIP2=172.16.16.13 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-b/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-b/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10711 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-b # 参与投票选举的通信操作 dLegerPeers=n3-172.16.16.13:40811;n4-172.16.16.6:40811;n5-172.16.16.5:40811 # 角色id, 必须唯一 dLegerSelfId=n4 ---------------------- # broker-c-s.properties brokerClusterName = DefaultCluster brokerName = broker-c # 0 代表Master, 非0 代表Slave brokerId = 1 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=42.194.183.141 brokerIP2=172.16.16.13 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-c/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-c/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10811 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-c # 参与投票选举的通信操作 dLegerPeers=n6-172.16.16.13:40711;n7-172.16.16.6:40711;n8-172.16.16.5:40711 # 角色id, 必须唯一 dLegerSelfId=n7
-
106.55.28.87 主机:
# broker-a-s.properties rokerClusterName = DefaultCluster brokerName = broker-a # 0 代表Master, 非0 代表Slave brokerId = 1 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=106.55.28.87 brokerIP2=172.16.16.6 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-a/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10811 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-a # 参与投票选举的通信操作 dLegerPeers=n0-172.16.16.13:40911;n1-172.16.16.6:40911;n2-172.16.16.5:40911 # 角色id, 必须唯一 dLegerSelfId=n1 ---------------------- # broker-c-s.properties brokerClusterName = DefaultCluster brokerName = broker-c # 0 代表Master, 非0 代表Slave brokerId = 2 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=106.55.28.87 brokerIP2=172.16.16.6 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-c/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-c/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10711 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-c # 参与投票选举的通信操作 dLegerPeers=n6-172.16.16.13:40711;n7-172.16.16.6:40711;n8-172.16.16.5:40711 # 角色id, 必须唯一 dLegerSelfId=n8
-
106.55.16.63 主机
# broker-a-s.properties brokerClusterName = DefaultCluster brokerName = broker-a # 0 代表Master, 非0 代表Slave brokerId = 2 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=106.55.16.63 brokerIP2=172.16.16.5 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-a/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10811 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-a # 参与投票选举的通信操作 dLegerPeers=n0-172.16.16.13:40911;n1-172.16.16.6:40911;n2-172.16.16.5:40911 # 角色id, 必须唯一 dLegerSelfId=n2 ---------------------- # broker-b-s.properties brokerClusterName = DefaultCluster brokerName = broker-b # 0 代表Master, 非0 代表Slave brokerId = 1 # 删除过期文件时间, 凌晨4点 deleteWhen = 04 # 文件保留时间48小时 fileReservedTime = 48 # broker的角色: ASYNC_MASTER ,SYNC_MASTER ,SLAVE brokerRole = SLAVE # 数据刷盘模式 异步刷盘: ASYNC_FLUSH 同步刷盘: SYNC_FLUSH flushDiskType = ASYNC_FLUSH # broker的ip地址 brokerIP1=106.55.16.63 brokerIP2=172.16.16.5 # NameServer的地址信息 namesrvAddr=172.16.16.13:9876;172.16.16.6:9876;172.16.16.5:9876 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-b/store storePathCommitLog=/usr/local/rocketmq-4.7.0/data/broker-b/store/commitlog # 是否自动创建主题, 建议关闭 autoCreateTopicEnable=false # 启动运行的监听端口 listenPort=10711 # 高可用集群使用 # 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别 dLegerGroup=broker-b # 参与投票选举的通信操作 dLegerPeers=n3-172.16.16.13:40811;n4-172.16.16.6:40811;n5-172.16.16.5:40811 # 角色id, 必须唯一 dLegerSelfId=n5
-
3台主机分别启动对应的2个从节点
# 服务器 42.194.183.141 nohup bin/mqbroker -c conf/3m-6s-async/broker-b-s.properties & nohup bin/mqbroker -c conf/3m-6s-async/broker-c-s.properties & # 服务器 106.55.28.87 nohup bin/mqbroker -c conf/3m-6s-async/broker-a-s.properties & nohup bin/mqbroker -c conf/3m-6s-async/broker-c-s.properties & # 服务器 106.55.16.63 nohup bin/mqbroker -c conf/3m-6s-async/broker-a-s.properties & nohup bin/mqbroker -c conf/3m-6s-async/broker-b-s.properties &
端口的意义
- 在启动Broker的时候, 默认情况下我们指定的listenPort=10911, 但是在真正运行的时候, 会启动10911 10909 10912 三个端口
-
解释如下:
haListenPort: 10912 在数据进行主从同步的端口(Master Slave之间通信的端口) listenPort=10911 负责生产者和消费者和Broker进行连接通信 vip_Channel_port=10909 负责生产者 创建Topic, 发送Message的端口
高可用集群Dledger
- 从上面的集群搭建可以看出,并没有完全高可用
- 一个master挂了,2个slave不能自动切换为master
- 3个master都挂了,无法进行消息写入
- 通过多副本机制Dledger可以完成Master宕机之后, 对于Slave角色可以自动切换为Master, 完成角色的自动转变, 实现高可用,通常可以解决以下场景
- 如果有顺序消息的应用,必须要写到某个队列中去
- 可以提高系统的吞吐量
- 在原来的配置中添加配置
- 每个节点配置的变化的地方已经标注“变化”
# 把commitLog交给DLeger管理 enableDLegerCommitLog=true # 存储路径 storePathRootDir=/usr/local/rocketmq-4.7.0/data/broker-a/store/dledger_store # 对于不同的broker需要添加不不同的组别, 变化:broker-a/broker-b/broker-c dLegerGroup=broker-a # 参与投票选举的通信操作, 变化:端口,broker-a/broker-b/broker-c分别为40911、40811、40711 nx,根据不同的broker组要设置不同n0/n1/n2、n3/n4/n5、 n6/n7/n8 dLegerPeers=n0-172.16.16.13:40911;n1-172.16.16.6:40911;n2-172.16.16.5:40911 # 角色id, 必须唯一,变化:master、slave1、slave2分别为n0、n1、n2 dLegerSelfId=n0
- 注意: 在4.5 版本以后才有