Java进阶-RocketMQ基础
消息中间件概述
- 不同系统之间的数据通信
- 方式一:通过RPC请求远程直接调用物流服务(Dubbo、SpringCloud)
- 方式二:通过消息中间件(RocketMQ、KafKa、RabbitMQ)完成消息的存储和转发
应用场景
- 异步解耦
- 在一个庞大的业务系统中, 通过对于主要业务模块和其他业务模块之间进行业务解耦
- 可以保障整体系统的稳定性, 比如物流系统在某一个时刻出现故障, 并不会影响用户下单操作
- 提高用户下单的响应速度, 比如以前没有消息中间件, 需要400ms的处理时间, 现在引入消息中间件只需要110ms的处理时间
-
如下图
- 如果直接通过RPC通信,那么业务是串行的
- 订单服务首先通过RPC访问库存服务,拿到库存数据(100ms)
- 然后在去调用物流服务通知物流(100ms)
- 然后再去调用积分服务修改积分(100ms)
- 整体耗时(创建订单100ms)400ms
- 如果使用消息中间件MQ,业务是异步的
- 订单服务只需要将获取库存、通知物流、修改积分都直接发送给MQ
- 库存服务、物流服务、积分服务自己去到MQ中去消费消息
- 任意一个服务挂了,不影响其他服务
- 耗时110ms
- 如果直接通过RPC通信,那么业务是串行的
- 削峰填谷
- 其中在我们应用系统架构的时候引入了消息中间件MQ之后, 我们所有的用户请求全部先到达消息中间件MQ, 然后业务系统从MQ读取对应的消息进行业务处理
- 假设业务系统的处理的峰值是1w/qps,
- 当用户请求的qps在低于一万的时候, 用户的请求可以正常的处理
- 当用户请求在某一个时刻突然高于1w/qps的时候, 比如在达到3w/qps的时候, 对于业务系统则可以达到自己的处理封装1w/qps, 消息中间件则可以达到3w/qps,这个时候会有很多的消息在消息中间件堆积,等待业务系统处理,这时消息中间件起到一个削峰的作用
- 当业务系统相对来说比较空闲的时候, 用户的请求低于1w/qps的时候,那么对于mq的处理能力是和用户处理能力一样, 但是对于应用系统可能还是在维持一个峰值(1w/qps)进行业务处理, 这是消息中间件起到一个填谷的作用
-
下图
- 消息分发
- 在双十一, 618这种大促销活动, 商家的同一件商品可能会有很多的分会场(天猫、京东等),对于价格的一个变化需要及时的通知到分会场
- 如果使用数据库,一个价格更新后直接修改数据,其他分会场直接去访问数据库,此时数据库并发量比较大,导致页面响应缓慢;而且其他分会场并不知道数据库何时更新的数据,不能及时更新。
- 可以通过MQ构建分布式缓存, 及时的通知到分会场商品数据的变化
- 数据实时更新
- 降低页面响应时间
- 满足大规模的数据访问
常见的消息中间件
- 对于消息中间件最关心的两个指标
- 高吞吐
- 低延迟
- 所谓高吞吐, 指的是并发处理能力非常好,比如单机情况下, KafKa可以达到百万级的处理能力, RocketMQ可以达到十万级的处理能力
- 所谓低延迟, 指的是对于请求处理的响应时间, 比如RocketMQ可以做到1ms内响应的延迟超过99.6%。注意理解: 这里是生产者发送消息到MQ后得到的回执响应时间,并不是消费消息的时间。
- 常见的消息中间件
- ActiveMQ
- ActiveMQ是Apache出品,比较老的一个开源的消息中间件, 是一个完全支持JMS规范的消息中间件.
- API丰富,以前在中小企业应用广泛,现在比较少企业使用
- RabbitMQ
- RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
- KafKa
- Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
- 在大数据领域和日志处理解决方案用的比较多
- RocketMQ
- RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
- 淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011 年初,Linkin开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也OK),目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog 分发等场景。
-
消息中间件对比
- ActiveMQ
RocketMQ的核心特性
- 低延迟
- 1ms内响应的延迟超过99.6%,就是在1ms内响应超过99.6%
- 高稳定性
- 阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品;
- 服务可用性 99.95%,Region 化、多可用区、分布式集群化部署,确保服务高可用,即便整个机房不可用仍可正常提供消息服务;
- 数据可靠性 99.99999999%,同步双写、超三副本数据冗余与快速切换技术确保数据可靠;
- 高性能
- 历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外);
- 在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;
- 丰富的消息类型
- 提供丰富的消息类型,满足各种严苛场景下的高级特性需求,当前支持的消息类型涵盖普通消息、顺序消息(全局顺序 / 分区顺序)、分布式事务消息、定时消息/延时消息;
RocketMQ的核心组件
运行模型(重点!!!)
- 发送消息业务的称为生产者Producer;获取消息中心的消息的业务称为消费者Consumer
- 消息中心有2个重要的角色
- NameServer:名称服务
- Broker(代理):存储消息、消息转发
- Broker里面包含Topic:
- 生产者发送的消息分为各种类型,比如订单消息、库存消息、日志消息等,因此在Broker里面存储按照类型存储,消息的类型叫做Topic
- 比如:有TopicA、TopicB。。。,订单消息存储到TopicA中。。。。
- 实际开发建议:一个应用最好只有一个Topic,对于消息类型可以使用Tag标记进行区分
- Topic里面包含Queue
- 如果消费者是一个集群部署,即n个消费者,那么就需要n个消费者同时到对应的Topic中获取消息,因此Topic中存储的消息就需要多线程并发发送
- 将某类消息存储到队列Queue中,供消费者集群来消费
- 因此某个Topic的队列Queue数量跟消费者集群的节点量有关,可以进行配置,比如:消费者集群有10个节点,那么Topic的消息队列可以配置为10个
- 队列中的每一条消息成为Message对象
- NameServer的作用
- Broker也可以部署成一个集群,那么当生产者需要给消息中心发送消息的时候,首先需要知道发送给哪个Broker
- 此时NameServer就是起到了这个作用,跟ZK一样。生产者第一次发送给NameServer,获取到对应的Broker,然后发送消息给这个Broker
- 同理消费者到消息中心获取消息,首先也是发送给NameServer,获取到对应的Broker,然后发送给这个Broker获取信息
- 消费者消费的时候会告诉Broker到底获取哪个类型Topic的消息
- 生产者发送消息给消息中心、消费者到消息中心获取数据或者消息中心推送给消费者数据,网络通信使用的是一个Netty框架,专门用来远程传递消息数据的
- 注意 这个Netty框架与RPC框架不同,RPC框架是远程的实例方法调用,而这个Netty框架只是用来传输消息数据
- 各个模块启动顺序如下:
- 启动服务器NameServer,NameServer的作用类似于ZK这样的注册中心, 主要用于存储元数据的管理, 比如每个Topic的位置信息
- 启动服务器Broker, Broker是数据处理服务器,对于不同的消息, 存储在不同的Topic中, 在同一个Topic, 为了提高消息处理的并发能力, 一个Topic会有多个Queue队列
- 启动生产者, 连接NameServer, 获取对应的Topic信息, 开始创建消息并发送
- 启动消费者, 连接NameServer, 获取对应的Topic信息, 开始消费消息
单机环境安装
RocketMQ的安装
- 安装包下载:
- 打开官网https://rocketmq.apache.org/docs/quick-start/->Download & Build from Release
-
有两种安装方式,源码、二进制
Click here to download the 4.7.0 source release. Also you could download a binary release from here.
- 这里使用源码安装:源码安装和二进制差不多,主要的操作步骤, 是通过源码去生成对应的二进制文件
- 下载源码
rocketmq-all-4.7.0-source-release.zip
- 环境依赖准备
- JDK 1.8+(略)
- Maven 3.2(大于3.2)(略)
- 在终端输入:
java -version
与mvn -v
查看是否安装 - 配置java环境变量(mac系统)
- 终端输入:
/usr/libexec/java_home
可以得到JAVA_HOME 的路径:/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home
- 在终端输入
sudo vim /etc/profile
-
输入i,添加如下配置:
JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home" export JAVA_HOME CLASS_PATH="$JAVA_HOME/lib" PATH=".$PATH:$JAVA_HOME/bin"
- 按ESC,输入
:wq!
保存 - 要想马上生效,输入
source /etc/profile
- 检查环境,输入
echo $JAVA_HOME
- 终端输入:
- 源码编译
- 将安装包上传到Linux的文件目录下:
/usr/local/software
文件夹下 -
将安装包源码解压到指定目录
unzip rocketmq-all-4.7.0-source-release.zip -C /usr/local/src
- 进入到源码目录
cd /usr/local/src/rocketmq-all-4.7.0-source-release
-
执行Maven构建生成编译后的二进制文件
#-P:执行环境,pom文件中pofile配置的环境release-all -DskipTests: 跳过测试 -U:强制更新 mvn -Prelease-all -DskipTests clean install -U
-
找到编译后的文件目录
cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
-
可以将这个可执行软件复制到相应目录下
#先执行 cd .. 到上级目录,然后执行拷贝迁移 cp -R rocketmq-4.7.0/ /usr/local/
- 将安装包上传到Linux的文件目录下:
- 修改配置参数
- 为了保证RocketMQ可以正常启动, 默认情况会使用比较大的内存(4G), 建议给NameServer和Broker设置为1G的内存
- 进入到rocketmq-4.7.0/bin
-
分别使用vim命令修改runbroker.sh和runserver.sh脚本
//runserver.sh //默认 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" //修改为 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512M -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" //runbroker.sh //默认 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" //如果内存不够,可以修改 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512M"
-
启动nameserver、Broker
//nameserver //在rocketmq-4.7.0目录下执行下面命令 nohup sh bin/mqnamesrv & //控制台出现下面说明启动成功 [1] 9377 appending output to nohup.out //Broker //端口写启动,需要告诉broker nameserver的地址跟端口 !!! nohup sh bin/mqbroker -n 127.0.0.1:9876 & //控制台出现下面说明启动成功 [1] 9509 appending output to nohup.out
-
日志查看
tail -f ~/logs/rocketmqlogs/namesrv.log tail -f ~/logs/rocketmqlogs/broker.log
-
查看nameserver和broker是否成功启动
//输入 jps //如果出现NameserverStartup和BrokerStartup就说明启动成功了。 3384 BrokerStartup 3433 Jps 3370 NamesrvStartup [3]+ Exit 255
-
注意:
- namesrv这个服务默认的端口是9876
- broker默认的端口是10911
- 这两个端口通常不会修改
-
-
关闭服务
//关闭Borker sh bin/mqshutdown broker # 关闭NameServer sh bin/mqshutdown namesrv
- 测试发消息
- 配置NAMESRV_ADDR到环境变量里
cd /usr/local/rocketmq-4.7.0
- 执行
export NAMESRV_ADDR=127.0.0.1:9876
-
生产者发送消息到mq
//输入命令,当前在 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
-
消费者获取消息
//输入命令 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 配置NAMESRV_ADDR到环境变量里
管理控制台安装
- 管理控制台也是一个springboot项目
- 在github查找rocketmq-externals项目,然后克隆下来
- rocketmq-console已更名为rocketmq-dashboard并转让了新的回购协议。因此,下面的rocketmq-console 替换成 rocketmq-dashboard
-
将该项目克隆到指定目录下
cd /usr/local/src git clone https://gitee.com/heshengjun/rocketmq-externals.git
- 编译源码
-
进入到管理控制台项目
cd rocketmq-externals/rocketmq-console
-
编译生成可执行程序
mvn clean package -Dmaven.test.skip=true //执行完会生成一个target文件,里面有个rocketmq-console-ng-1.0.1.jar可执行程序
-
将该可执行程序放到一个指定目录
- 在
/usr/local/
下新建一个apps文件夹,然后将rocketmq-console-ng-1.0.1.jar 复制进去
- 在
-
- 启动管理控制台
- 启动rocket服务, 在启动管理控制台之前, 必须先启动NameServer
-
在启动目录(包含rocketmq-console-ng-1.0.1.jar的目录下
/usr/local/apps
)创建一个application.properties,配置信息如下:#当前服务的端口 server.port=9999 #地址为上面mq的nameserver的地址端口 rocketmq.config.namesrvAddr=127.0.0.1:9876
-
启动管理控制台
nohup java -jar rocketmq-console-ng-1.0.1.jar &
- 在浏览器输入:
http://localhost:9999/
能看到管理端页面- 注意: 要先启动mq的nameserver、broker
- 这个页面主要用于查看有哪些生产者生产了多少消息,有哪些消费者消费了多少消息
简单使用
- 新建一个项目(new Module)-》选择Maven->next->name: 13-rocketmq-demo Atrifact coordinates展开:groupid: com.zh.demo Atrifactid:rocketmq-demo -> finished
-
pom文件添加mq依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
- src下新建包名com.zh.demo._01_hello
-
新建消息生产者测试类:ProducerDemo
package com.zh.demo._01_hello; public class ProducerDemo { public static void main(String[] args) throws Exception { //1 创建一个生成者对象:参数为:生产者组 DefaultMQProducer producer = new DefaultMQProducer("group01"); //2 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); //3 启动生产者 producer.start(); //4 创建消息Message:第一个参数为topic(字符串),第二个参数为消息体(字节数组) Message msg = new Message("01_hello", "hello, NBA".getBytes()); //5 发送消息 SendResult result = producer.send(msg); System.out.println(JSON.toJSONString(result)); //6 关闭生产者 producer.shutdown(); } }
- 新建消息消费者类
- 注意:消费者消费消息中心的消息有2种方式
- 消息中心主动推送给消费者(最常用)
- 消费者主动到消息中心拉取
package com.zh.demo._01_hello; public class ConsumerDemo { public static void main(String[] args) throws Exception { //1 创建一个消费者对象,使用DefaultMQPushConsumer类,代表是消息中心推送给消费者 //使用DefaultLitePullConsumer类,主动到消息中心拉取的方式 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2 设置NameServerAddr consumer.setNamesrvAddr("127.0.0.1:9876"); //3 先订阅消息主题(topic),采取消息中心直接推动给消费者的方式,第二个参数表示过滤消息,*代表所有消息都不过滤 consumer.subscribe("01_hello","*"); //4 注册监听消息的处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费数据成功:"+new String(msg.getBody())); } //只有返回给消息中心,消费成功,消息中心才不会再次推送这个消息,否则还会继续推动这个消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5 启动消费者 consumer.start(); } }
- 注意:消费者消费消息中心的消息有2种方式
- 注意:
- 上面的生产者、消费者地址都是写死的,在实际开发中通常分为测试、生产地址,因此不能写死
- 解决方法有2种:
-
配置环境变量:(window、mac都可以配置)
//windows配置环境变量(系统变量) NAMESRV_ADDR 192.168.48.102:9876 //Mac配置 export NAMESRV_ADDR=localhost:9876
-
通过配置JVM参数配置
// 在VM Options NAMESRV_ADDR=localhost:9876
-
- 配置完成,上面的 “设置NameServerAddr“代码就可以删除,启动的时候到环境变量去找,或者启动参数中查找
生产者发送消息
发送消息的三种方式
- 同步发送消息
- 同步, 指的是在发送数据到消息中间件的时候, 需要及时的返回一个结果到发送者
- 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- 异步发送消息
- 在异步发送消息的时候, 我们把数据发送到消息中心, 此时我们不会等待消息中心的返回结果, 而是程序继续往下执行, 当有结果返回的时候, 通过异步通知的方式告诉消息生产者
- 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
- 一次性发送消息
- 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
- 发送出去不要结果
代码举例
- 异步发送
- 由于是异步的,必须消息响应完成才能关闭线程
package com.zh.demo._02_async; public class ProducerDemo { public static void main(String[] args) throws Exception{ //1 创建一个生成者对象 DefaultMQProducer producer = new DefaultMQProducer("async"); //2 设置NameServer的地址;这里不用写,会自动到环境变量去找,或者启动参数中查找 //3 启动生产者 try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } //4 创建消息Message //5 发送消息 //创建一个计数器 final CountDownLatch2 countDownLatch = new CountDownLatch2(5); for (int i = 0; i < 5; i++) { Message msg = new Message("02_async", "hello, NBA".getBytes()); try { producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(Thread.currentThread().getName()); if(sendResult.getSendStatus()== SendStatus.SEND_OK){ System.out.println("消息发送成功"+ JSON.toJSONString(sendResult)); } countDownLatch.countDown(); } //异常处理 @Override public void onException(Throwable e) { e.printStackTrace(); countDownLatch.countDown(); } }); } catch (MQClientException e) { //补救措施 把对应的消息先保存到另外一个地方 MySQL, 自己到时候重新触发 发送消息 e.printStackTrace(); } catch (RemotingException e) { //补救措施 e.printStackTrace(); } catch (InterruptedException e) { //补救措施 e.printStackTrace(); } } // 如果使用异步操作, 需要等待接收完所有的异步返回结果之后, 再去关闭主线程 countDownLatch.await();// 等待计数器归0 //6 关闭生产者 producer.shutdown(); } }
-
一次性发送
package com.zh.demo._03_oneway; public class ProducerDemo { public static void main(String[] args) throws Exception { //1 创建一个生成者对象 DefaultMQProducer producer = new DefaultMQProducer("03_oneway"); //2 设置NameServer的地址;这里不用写,会自动到环境变量去找,或者启动参数中查找 //3 启动生产者 producer.start(); //4 创建消息Message //5 发送消息 //创建一个计数器 for (int i = 0; i < 5; i++) { // Tag: 消息标签, 用来进行消息分类 支付消息--> wx支付,zfb支付 // Key: 业务消息 订单消息: key: 订单id/订单号 Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes()); producer.sendOneway(msg);//日志, 大数据应用 } //6 关闭生产者 producer.shutdown(); } }
发送消息补充
- 生产者组
- 创建生产者时需要传递一个参数producerGroup:生产者组
- 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
- 如果生产者发送完消息到消息中心,消息中心也需要发送消息给生产者,但是此时如果之前的那个生产者挂了,就需要发送到其他同类的生产者回调
- 如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
- 消息封装(就是给消息设置一定的特征,让指定消费者去消费)
- Tag
- 用来给消息进行标记, 可以通过Tag对消息进行分类, 把不同类型的消息交给不同的消费者进行消费
- Key
- Key: 可以设置消息的一个唯一ID, 用于区分每个消息的标志, 业务ID
- 并且在管理控制台,可以通过Key进行消息的查询跟踪
// Tag: 消息标签, 用来进行消息分类 支付消息--> wx支付,zfb支付 // Key: 业务消息 订单消息: key: 订单id/订单号 Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes());
- Tag
消费者消费消息
两种消费方式
- 拉式消费
- 消费者到消息中心拉取消息,进行消费
- 最早期使用DefaultMQPullConsumer(PULL 模式)类来进行消息拉取,但是该类的 API 太底层,使用起来及其不方便,RocketMQ 官方设计者也注意到这个问题,为此在 RocketMQ 4.6.0 版本中引入了 PULL 模式的另外一个实现类 DefaultLitePullConsumer,即从 4.6.0 版本后,DefaultMQPullConsumer 已经被标记为废弃,故接下来将重点介绍 DefaultLitePullConsumer,并探究如何在实际中运用它解决相关问题。
public class PullConsumer { public static void main(String[] args) throws Exception { //1 创建一个消费者对象 DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("type"); //2 设置NameServerAddr // 生产环境, 测试环境 NameServer 不一样 //3 先订阅消息主题 consumer.subscribe("04_type","*"); //每次拉取的消息量,默认是10 consumer.setPullBatchSize(10); //间隔多久拉取一次,也可以理解为每5s阻塞一次,默认5s consumer.setPollTimeoutMillis(5000); //设置拉取的线程数,默认20 consumer.setPullThreadNums(20); //5 启动消费者 consumer.start(); while (true){ //5s钟拉取一下 List<MessageExt> msgs = consumer.poll();//拉取间隔时间5s System.out.println(msgs); //手动提交 consumer.commitSync(); } } }
- 推式消费
- 消息中心主动推送给消费者,消费者需要提前订阅好消息中心的消费
/** * 推式消费 消息中心 主动把消息推送给 消费者 */ public class PushConsumerDemo { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("type"); consumer.setNamesrvAddr("192.168.48.102:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题, 后面可以指定表达式Tag, 根据tag进行消息过滤 consumer.subscribe("01_hello","*"); //注册消息消费监听 有消息就会触发consumeMessage方法 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("接收消息线程: "+Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.println("接收到的消息:"+new String(msg.getBody())); } //确定消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
2种消费模式
- 集群模式
- 多个消费者消费一个队列,每个消费者消费不同的消息,提高消费能力
- 即:对于同一个消费者组里面的多个消费者, 每个消费者消费的消息都是不一样的, 相当于消费者的负载均衡
-
通过设置消费者的模式
consumer.setMessageModel(MessageModel.CLUSTERING);
- 广播模式
- 多个消费者消费相同的消息,一条消息被n个消费者消费
- 即:每个消费者都会接受全量的消息, 所有消费者消费的数据都是一样的
- 一般用于对于消息需要多个其他业务进行处理
-
通过设置消费者的模式
consumer.setMessageModel(MessageModel.BROADCASTING);
消息消费的位置
- 在指定消费的pos位置的时候, 会优先获取服务端记录的上次消费点, 所以该参数只有在服务端没有对应的消费者的记录的时候有效,一般情况是第一次启动的消费者有效
- 三种
- CONSUME_FROM_FIRST_OFFSET: 从最开始的位置消费, 会消费该Topic下面所有的有效的数据, 过期的数据会删除掉
- CONSUME_FROM_LAST_OFFSET
- 如果该Topic的数据都是最近的数据, 没有过期数据, 则从最开始的位置消费
- 如果该Topic的数据存在过期的数据, 则从最后的位置开始消费, 只会消费新加入的数据
- CONSUME_FROM_TIMESTAMP
- 根据指定的时间戳进行消费,配合
consumer.setConsumeTimestamp("20200612083300");
从指定的时间开始消费, 如果不指定, 则默认从半个小时前的数据开始消费
- 根据指定的时间戳进行消费,配合
-
代码举例:
public static void main(String[] args) throws Exception { //1 创建一个消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("05_model"); //消息模式设置为集群 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("05_model","*"); // 根据指定的时间去消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp("20200623210000"); //4 注册监听消息的处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费Consumer001 数据成功:"+new String(msg.getBody())+"msg.getQueueId():"+msg.getQueueId()); } // 也会改变消费者的偏移量, 但是数据会放到重试队列 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); System.out.println("消费Consumer001启动成功"); //5 启动消费者 consumer.start(); }
消息确认
- 拉式消费(DefaultLitePullConsumer)
-
可以通过
consumer.setAutoCommit(false);
设置是否自动提交, 如果设置为手动提交, 需要使用consumer.commitSync();
方法进行手动提交//1 创建一个消费者对象 DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("05_model_002"); //2 设置NameServerAddr // 生产环境, 测试环境 NameServer 不一样 //3 先订阅消息主题 consumer.subscribe("05_model","*"); //取消自动提交 consumer.setAutoCommit(false); //5 启动消费者 consumer.start(); while (true){ List<MessageExt> msgs = consumer.poll();//拉取间隔时间5s System.out.println(msgs); //手动提交 consumer.commitSync(); }
-
- 推式消费(DefaultMQPushConsumer)
- 通过返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
状态表示消费成功 - 返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
表示消费不成功, 会放入到重试队列- 默认重试采用服务端重试: 重试次数:16次
- 代码示例如上面示例
- 通过返回