Java进阶-RocketMQ基础

消息中间件概述

  1. 不同系统之间的数据通信
    1. 方式一:通过RPC请求远程直接调用物流服务(Dubbo、SpringCloud)
    2. 方式二:通过消息中间件(RocketMQ、KafKa、RabbitMQ)完成消息的存储和转发

应用场景

  1. 异步解耦
    1. 在一个庞大的业务系统中, 通过对于主要业务模块和其他业务模块之间进行业务解耦
    2. 可以保障整体系统的稳定性, 比如物流系统在某一个时刻出现故障, 并不会影响用户下单操作
    3. 提高用户下单的响应速度, 比如以前没有消息中间件, 需要400ms的处理时间, 现在引入消息中间件只需要110ms的处理时间
    4. 如下图 图1

      1. 如果直接通过RPC通信,那么业务是串行的
        1. 订单服务首先通过RPC访问库存服务,拿到库存数据(100ms)
        2. 然后在去调用物流服务通知物流(100ms)
        3. 然后再去调用积分服务修改积分(100ms)
        4. 整体耗时(创建订单100ms)400ms
      2. 如果使用消息中间件MQ,业务是异步的
        1. 订单服务只需要将获取库存、通知物流、修改积分都直接发送给MQ
        2. 库存服务、物流服务、积分服务自己去到MQ中去消费消息
        3. 任意一个服务挂了,不影响其他服务
        4. 耗时110ms
  2. 削峰填谷
    1. 其中在我们应用系统架构的时候引入了消息中间件MQ之后, 我们所有的用户请求全部先到达消息中间件MQ, 然后业务系统从MQ读取对应的消息进行业务处理
    2. 假设业务系统的处理的峰值是1w/qps,
      1. 当用户请求的qps在低于一万的时候, 用户的请求可以正常的处理
      2. 当用户请求在某一个时刻突然高于1w/qps的时候, 比如在达到3w/qps的时候, 对于业务系统则可以达到自己的处理封装1w/qps, 消息中间件则可以达到3w/qps,这个时候会有很多的消息在消息中间件堆积,等待业务系统处理,这时消息中间件起到一个削峰的作用
      3. 当业务系统相对来说比较空闲的时候, 用户的请求低于1w/qps的时候,那么对于mq的处理能力是和用户处理能力一样, 但是对于应用系统可能还是在维持一个峰值(1w/qps)进行业务处理, 这是消息中间件起到一个填谷的作用
    3. 下图

      图1

  3. 消息分发
    1. 在双十一, 618这种大促销活动, 商家的同一件商品可能会有很多的分会场(天猫、京东等),对于价格的一个变化需要及时的通知到分会场
    2. 如果使用数据库,一个价格更新后直接修改数据,其他分会场直接去访问数据库,此时数据库并发量比较大,导致页面响应缓慢;而且其他分会场并不知道数据库何时更新的数据,不能及时更新。
    3. 可以通过MQ构建分布式缓存, 及时的通知到分会场商品数据的变化
      1. 数据实时更新
      2. 降低页面响应时间
      3. 满足大规模的数据访问

      图1

常见的消息中间件

  1. 对于消息中间件最关心的两个指标
    1. 高吞吐
    2. 低延迟
  2. 所谓高吞吐, 指的是并发处理能力非常好,比如单机情况下, KafKa可以达到百万级的处理能力, RocketMQ可以达到十万级的处理能力
  3. 所谓低延迟, 指的是对于请求处理的响应时间, 比如RocketMQ可以做到1ms内响应的延迟超过99.6%。注意理解: 这里是生产者发送消息到MQ后得到的回执响应时间,并不是消费消息的时间。
  4. 常见的消息中间件
    1. ActiveMQ
      1. ActiveMQ是Apache出品,比较老的一个开源的消息中间件, 是一个完全支持JMS规范的消息中间件.
      2. API丰富,以前在中小企业应用广泛,现在比较少企业使用
    2. RabbitMQ
      1. RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
    3. KafKa
      1. Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
      2. 在大数据领域和日志处理解决方案用的比较多
    4. RocketMQ
      1. RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
      2. 淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011 年初,Linkin开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也OK),目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog 分发等场景。
    5. 消息中间件对比

      图1

RocketMQ的核心特性

  1. 低延迟
    1. 1ms内响应的延迟超过99.6%,就是在1ms内响应超过99.6%
  2. 高稳定性
    1. 阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品;
    2. 服务可用性 99.95%,Region 化、多可用区、分布式集群化部署,确保服务高可用,即便整个机房不可用仍可正常提供消息服务;
    3. 数据可靠性 99.99999999%,同步双写、超三副本数据冗余与快速切换技术确保数据可靠;
  3. 高性能
    1. 历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外);
    2. 在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;
  4. 丰富的消息类型
    1. 提供丰富的消息类型,满足各种严苛场景下的高级特性需求,当前支持的消息类型涵盖普通消息、顺序消息(全局顺序 / 分区顺序)、分布式事务消息、定时消息/延时消息;

RocketMQ的核心组件

运行模型(重点!!!

图1

  1. 发送消息业务的称为生产者Producer;获取消息中心的消息的业务称为消费者Consumer
  2. 消息中心有2个重要的角色
    1. NameServer:名称服务
    2. Broker(代理):存储消息、消息转发
  3. Broker里面包含Topic:
    1. 生产者发送的消息分为各种类型,比如订单消息、库存消息、日志消息等,因此在Broker里面存储按照类型存储,消息的类型叫做Topic
    2. 比如:有TopicA、TopicB。。。,订单消息存储到TopicA中。。。。
    3. 实际开发建议:一个应用最好只有一个Topic,对于消息类型可以使用Tag标记进行区分
  4. Topic里面包含Queue
    1. 如果消费者是一个集群部署,即n个消费者,那么就需要n个消费者同时到对应的Topic中获取消息,因此Topic中存储的消息就需要多线程并发发送
    2. 将某类消息存储到队列Queue中,供消费者集群来消费
    3. 因此某个Topic的队列Queue数量跟消费者集群的节点量有关,可以进行配置,比如:消费者集群有10个节点,那么Topic的消息队列可以配置为10个
    4. 队列中的每一条消息成为Message对象
  5. NameServer的作用
    1. Broker也可以部署成一个集群,那么当生产者需要给消息中心发送消息的时候,首先需要知道发送给哪个Broker
    2. 此时NameServer就是起到了这个作用,跟ZK一样。生产者第一次发送给NameServer,获取到对应的Broker,然后发送消息给这个Broker
    3. 同理消费者到消息中心获取消息,首先也是发送给NameServer,获取到对应的Broker,然后发送给这个Broker获取信息
  6. 消费者消费的时候会告诉Broker到底获取哪个类型Topic的消息
  7. 生产者发送消息给消息中心、消费者到消息中心获取数据或者消息中心推送给消费者数据,网络通信使用的是一个Netty框架,专门用来远程传递消息数据的
    1. 注意 这个Netty框架与RPC框架不同,RPC框架是远程的实例方法调用,而这个Netty框架只是用来传输消息数据
  8. 各个模块启动顺序如下:
    1. 启动服务器NameServer,NameServer的作用类似于ZK这样的注册中心, 主要用于存储元数据的管理, 比如每个Topic的位置信息
    2. 启动服务器Broker, Broker是数据处理服务器,对于不同的消息, 存储在不同的Topic中, 在同一个Topic, 为了提高消息处理的并发能力, 一个Topic会有多个Queue队列
    3. 启动生产者, 连接NameServer, 获取对应的Topic信息, 开始创建消息并发送
    4. 启动消费者, 连接NameServer, 获取对应的Topic信息, 开始消费消息

单机环境安装

RocketMQ的安装

  1. 安装包下载:
    1. 打开官网https://rocketmq.apache.org/docs/quick-start/->Download & Build from Release
    2. 有两种安装方式,源码、二进制

       Click here to download the 4.7.0 source release. Also you could download a binary release from here.
      
    3. 这里使用源码安装:源码安装和二进制差不多,主要的操作步骤, 是通过源码去生成对应的二进制文件
    4. 下载源码rocketmq-all-4.7.0-source-release.zip
  2. 环境依赖准备
    1. JDK 1.8+(略)
    2. Maven 3.2(大于3.2)(略)
    3. 在终端输入:java -versionmvn -v查看是否安装
    4. 配置java环境变量(mac系统)
      1. 终端输入:/usr/libexec/java_home可以得到JAVA_HOME 的路径:/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home
      2. 在终端输入 sudo vim /etc/profile
      3. 输入i,添加如下配置:

         JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home"
         export JAVA_HOME
         CLASS_PATH="$JAVA_HOME/lib"
         PATH=".$PATH:$JAVA_HOME/bin"
        
      4. 按ESC,输入:wq!保存
      5. 要想马上生效,输入source /etc/profile
      6. 检查环境,输入 echo $JAVA_HOME
  3. 源码编译
    1. 将安装包上传到Linux的文件目录下:/usr/local/software文件夹下
    2. 将安装包源码解压到指定目录

       unzip rocketmq-all-4.7.0-source-release.zip -C /usr/local/src
      
    3. 进入到源码目录 cd /usr/local/src/rocketmq-all-4.7.0-source-release
    4. 执行Maven构建生成编译后的二进制文件

       #-P:执行环境,pom文件中pofile配置的环境release-all  -DskipTests: 跳过测试 -U:强制更新
       mvn -Prelease-all -DskipTests clean install -U
      
    5. 找到编译后的文件目录

       cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
      
    6. 可以将这个可执行软件复制到相应目录下

       #先执行 cd .. 到上级目录,然后执行拷贝迁移
       cp -R rocketmq-4.7.0/ /usr/local/
      
  4. 修改配置参数
    1. 为了保证RocketMQ可以正常启动, 默认情况会使用比较大的内存(4G), 建议给NameServer和Broker设置为1G的内存
    2. 进入到rocketmq-4.7.0/bin
    3. 分别使用vim命令修改runbroker.sh和runserver.sh脚本

       //runserver.sh
       //默认
       JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      
       //修改为
       JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512M -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
              
       //runbroker.sh
       //默认
       JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
       //如果内存不够,可以修改
       JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512M"
      
  5. 启动nameserver、Broker

     //nameserver
     //在rocketmq-4.7.0目录下执行下面命令
     nohup sh bin/mqnamesrv &
        
     //控制台出现下面说明启动成功
     [1] 9377
     appending output to nohup.out
        
     //Broker
     //端口写启动,需要告诉broker nameserver的地址跟端口 !!!
     nohup sh bin/mqbroker -n 127.0.0.1:9876 &
        
     //控制台出现下面说明启动成功
     [1] 9509
     appending output to nohup.out
    
    1. 日志查看

       tail -f ~/logs/rocketmqlogs/namesrv.log
       tail -f ~/logs/rocketmqlogs/broker.log 
      
    2. 查看nameserver和broker是否成功启动

       //输入
       jps
       //如果出现NameserverStartup和BrokerStartup就说明启动成功了。
       3384 BrokerStartup
       3433 Jps
       3370 NamesrvStartup
       [3]+  Exit 255  
      
    3. 注意:

      1. namesrv这个服务默认的端口是9876
      2. broker默认的端口是10911
      3. 这两个端口通常不会修改
  6. 关闭服务

     //关闭Borker
     sh bin/mqshutdown broker
     # 关闭NameServer
     sh bin/mqshutdown namesrv
    
  7. 测试发消息
    1. 配置NAMESRV_ADDR到环境变量里
      1. cd /usr/local/rocketmq-4.7.0
      2. 执行export NAMESRV_ADDR=127.0.0.1:9876
    2. 生产者发送消息到mq

       //输入命令,当前在
       sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
      
    3. 消费者获取消息

       //输入命令
       sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
      

管理控制台安装

  1. 管理控制台也是一个springboot项目
    1. 在github查找rocketmq-externals项目,然后克隆下来
    2. rocketmq-console已更名为rocketmq-dashboard并转让了新的回购协议。因此,下面的rocketmq-console 替换成 rocketmq-dashboard
  2. 将该项目克隆到指定目录下

     cd /usr/local/src 
     git clone https://gitee.com/heshengjun/rocketmq-externals.git
    
  3. 编译源码
    1. 进入到管理控制台项目

       cd rocketmq-externals/rocketmq-console
      
    2. 编译生成可执行程序

       mvn clean package -Dmaven.test.skip=true
       //执行完会生成一个target文件,里面有个rocketmq-console-ng-1.0.1.jar可执行程序
      
    3. 将该可执行程序放到一个指定目录

      1. /usr/local/下新建一个apps文件夹,然后将rocketmq-console-ng-1.0.1.jar 复制进去
  4. 启动管理控制台
    1. 启动rocket服务, 在启动管理控制台之前, 必须先启动NameServer
    2. 在启动目录(包含rocketmq-console-ng-1.0.1.jar的目录下/usr/local/apps)创建一个application.properties,配置信息如下:

       #当前服务的端口
       server.port=9999
       #地址为上面mq的nameserver的地址端口
       rocketmq.config.namesrvAddr=127.0.0.1:9876
      
    3. 启动管理控制台

       nohup java -jar rocketmq-console-ng-1.0.1.jar  &
      
    4. 在浏览器输入:http://localhost:9999/ 能看到管理端页面
      1. 注意: 要先启动mq的nameserver、broker
      2. 这个页面主要用于查看有哪些生产者生产了多少消息,有哪些消费者消费了多少消息

简单使用

  1. 新建一个项目(new Module)-》选择Maven->next->name: 13-rocketmq-demo Atrifact coordinates展开:groupid: com.zh.demo Atrifactid:rocketmq-demo -> finished
  2. pom文件添加mq依赖

     <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-client</artifactId>
         <version>4.7.0</version>
     </dependency>
    
  3. src下新建包名com.zh.demo._01_hello
  4. 新建消息生产者测试类:ProducerDemo

     package com.zh.demo._01_hello;
     public class ProducerDemo {
         public static void main(String[] args) throws Exception {
             //1 创建一个生成者对象:参数为:生产者组
             DefaultMQProducer producer = new DefaultMQProducer("group01");
             //2 设置NameServer的地址
             producer.setNamesrvAddr("127.0.0.1:9876");
             //3 启动生产者
             producer.start();
             //4 创建消息Message:第一个参数为topic(字符串),第二个参数为消息体(字节数组)
             Message msg = new Message("01_hello", "hello, NBA".getBytes());
             //5 发送消息
             SendResult result = producer.send(msg);
             System.out.println(JSON.toJSONString(result));
             //6 关闭生产者
             producer.shutdown();
         }
     }
    
  5. 新建消息消费者类
    1. 注意:消费者消费消息中心的消息有2种方式
      1. 消息中心主动推送给消费者(最常用)
      2. 消费者主动到消息中心拉取
     package com.zh.demo._01_hello;
     public class ConsumerDemo {
         public static void main(String[] args) throws Exception {
             //1 创建一个消费者对象,使用DefaultMQPushConsumer类,代表是消息中心推送给消费者
             //使用DefaultLitePullConsumer类,主动到消息中心拉取的方式
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
             //2 设置NameServerAddr
             consumer.setNamesrvAddr("127.0.0.1:9876");
             //3 先订阅消息主题(topic),采取消息中心直接推动给消费者的方式,第二个参数表示过滤消息,*代表所有消息都不过滤
             consumer.subscribe("01_hello","*");
             //4 注册监听消息的处理
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                     for (MessageExt msg : msgs) {
                         System.out.println("消费数据成功:"+new String(msg.getBody()));
                     }
                     //只有返回给消息中心,消费成功,消息中心才不会再次推送这个消息,否则还会继续推动这个消息
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             //5 启动消费者
             consumer.start();
         }
     }
    
  6. 注意:
    1. 上面的生产者、消费者地址都是写死的,在实际开发中通常分为测试、生产地址,因此不能写死
    2. 解决方法有2种:
      1. 配置环境变量:(window、mac都可以配置)

         //windows配置环境变量(系统变量)
         NAMESRV_ADDR 192.168.48.102:9876 
                    
         //Mac配置
         export NAMESRV_ADDR=localhost:9876 
        
      2. 通过配置JVM参数配置

         // 在VM Options
         NAMESRV_ADDR=localhost:9876
        
    3. 配置完成,上面的 “设置NameServerAddr“代码就可以删除,启动的时候到环境变量去找,或者启动参数中查找

生产者发送消息

发送消息的三种方式

  1. 同步发送消息
    1. 同步, 指的是在发送数据到消息中间件的时候, 需要及时的返回一个结果到发送者
    2. 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  2. 异步发送消息
    1. 在异步发送消息的时候, 我们把数据发送到消息中心, 此时我们不会等待消息中心的返回结果, 而是程序继续往下执行, 当有结果返回的时候, 通过异步通知的方式告诉消息生产者
    2. 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  3. 一次性发送消息
    1. 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
    2. 发送出去不要结果

代码举例

  1. 异步发送
    1. 由于是异步的,必须消息响应完成才能关闭线程
     package com.zh.demo._02_async;
     public class ProducerDemo {
         public static void main(String[] args) throws  Exception{
             //1 创建一个生成者对象
             DefaultMQProducer producer = new DefaultMQProducer("async");
             //2 设置NameServer的地址;这里不用写,会自动到环境变量去找,或者启动参数中查找
             //3 启动生产者
             try {
                 producer.start();
             } catch (MQClientException e) {
                 e.printStackTrace();
             }
             //4 创建消息Message
             //5 发送消息
             //创建一个计数器
             final CountDownLatch2 countDownLatch = new CountDownLatch2(5);
             for (int i = 0; i < 5; i++) {
                 Message msg = new Message("02_async", "hello, NBA".getBytes());
                 try {
                     producer.send(msg, new SendCallback() {
                         @Override
                         public void onSuccess(SendResult sendResult) {
                             System.out.println(Thread.currentThread().getName());
                             if(sendResult.getSendStatus()== SendStatus.SEND_OK){
                                 System.out.println("消息发送成功"+ JSON.toJSONString(sendResult));
                             }
                             countDownLatch.countDown();
                         }
                         //异常处理
                         @Override
                         public void onException(Throwable e) {
                             e.printStackTrace();
                             countDownLatch.countDown();
                         }
                     });
                 } catch (MQClientException e) {
                     //补救措施  把对应的消息先保存到另外一个地方 MySQL, 自己到时候重新触发 发送消息
                     e.printStackTrace();
                 } catch (RemotingException e) {
                     //补救措施
                     e.printStackTrace();
                 } catch (InterruptedException e) {
                     //补救措施
                     e.printStackTrace();
                 }
             }
             // 如果使用异步操作, 需要等待接收完所有的异步返回结果之后, 再去关闭主线程
             countDownLatch.await();// 等待计数器归0
             //6 关闭生产者
             producer.shutdown();
        
         }
     }
    
  2. 一次性发送

     package com.zh.demo._03_oneway;
     public class ProducerDemo {
         public static void main(String[] args) throws Exception {
             //1 创建一个生成者对象
             DefaultMQProducer producer = new DefaultMQProducer("03_oneway");
             //2 设置NameServer的地址;这里不用写,会自动到环境变量去找,或者启动参数中查找
             //3 启动生产者
             producer.start();
             //4 创建消息Message
             //5 发送消息
             //创建一个计数器
             for (int i = 0; i < 5; i++) {
                 // Tag: 消息标签, 用来进行消息分类  支付消息--> wx支付,zfb支付
                 // Key: 业务消息  订单消息: key: 订单id/订单号
                 Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes());
                 producer.sendOneway(msg);//日志, 大数据应用
             }
             //6 关闭生产者
             producer.shutdown();
         }
     }
    

发送消息补充

  1. 生产者组
    1. 创建生产者时需要传递一个参数producerGroup:生产者组
    2. 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
    3. 如果生产者发送完消息到消息中心,消息中心也需要发送消息给生产者,但是此时如果之前的那个生产者挂了,就需要发送到其他同类的生产者回调
    4. 如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
  2. 消息封装(就是给消息设置一定的特征,让指定消费者去消费)
    1. Tag
      1. 用来给消息进行标记, 可以通过Tag对消息进行分类, 把不同类型的消息交给不同的消费者进行消费
    2. Key
      1. Key: 可以设置消息的一个唯一ID, 用于区分每个消息的标志, 业务ID
      2. 并且在管理控制台,可以通过Key进行消息的查询跟踪
     // Tag: 消息标签, 用来进行消息分类  支付消息--> wx支付,zfb支付
     // Key: 业务消息  订单消息: key: 订单id/订单号
     Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes());
    

消费者消费消息

两种消费方式

  1. 拉式消费
    1. 消费者到消息中心拉取消息,进行消费
    2. 最早期使用DefaultMQPullConsumer(PULL 模式)类来进行消息拉取,但是该类的 API 太底层,使用起来及其不方便,RocketMQ 官方设计者也注意到这个问题,为此在 RocketMQ 4.6.0 版本中引入了 PULL 模式的另外一个实现类 DefaultLitePullConsumer,即从 4.6.0 版本后,DefaultMQPullConsumer 已经被标记为废弃,故接下来将重点介绍 DefaultLitePullConsumer,并探究如何在实际中运用它解决相关问题。
     public class PullConsumer {
    
         public static void main(String[] args) throws Exception {
             //1 创建一个消费者对象
             DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("type");
             //2 设置NameServerAddr
             // 生产环境, 测试环境 NameServer 不一样
             //3 先订阅消息主题
             consumer.subscribe("04_type","*");
              //每次拉取的消息量,默认是10
             consumer.setPullBatchSize(10);
             //间隔多久拉取一次,也可以理解为每5s阻塞一次,默认5s
             consumer.setPollTimeoutMillis(5000);
             //设置拉取的线程数,默认20
             consumer.setPullThreadNums(20);
                
             //5 启动消费者
             consumer.start();
             while (true){
                 //5s钟拉取一下
                 List<MessageExt> msgs = consumer.poll();//拉取间隔时间5s
                 System.out.println(msgs);
                 //手动提交
                 consumer.commitSync();
             }
         }
     }
    
  2. 推式消费
    1. 消息中心主动推送给消费者,消费者需要提前订阅好消息中心的消费
     /**
      * 推式消费 消息中心 主动把消息推送给  消费者
      */
     public class PushConsumerDemo {
         public static void main(String[] args) throws Exception {
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("type");
             consumer.setNamesrvAddr("192.168.48.102:9876");
             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
             //订阅主题, 后面可以指定表达式Tag, 根据tag进行消息过滤
             consumer.subscribe("01_hello","*");
             //注册消息消费监听 有消息就会触发consumeMessage方法
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                     System.out.println("接收消息线程: "+Thread.currentThread().getName());
                     for (MessageExt msg : msgs) {
                         System.out.println("接收到的消息:"+new String(msg.getBody()));
                     }
                     //确定消息消费成功
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             //启动消费者
             consumer.start();
         }
     }
    

2种消费模式

  1. 集群模式
    1. 多个消费者消费一个队列,每个消费者消费不同的消息,提高消费能力
    2. 即:对于同一个消费者组里面的多个消费者, 每个消费者消费的消息都是不一样的, 相当于消费者的负载均衡
    3. 通过设置消费者的模式

       consumer.setMessageModel(MessageModel.CLUSTERING);
      
  2. 广播模式
    1. 多个消费者消费相同的消息,一条消息被n个消费者消费
    2. 即:每个消费者都会接受全量的消息, 所有消费者消费的数据都是一样的
    3. 一般用于对于消息需要多个其他业务进行处理
    4. 通过设置消费者的模式

       consumer.setMessageModel(MessageModel.BROADCASTING);
      

消息消费的位置

  1. 在指定消费的pos位置的时候, 会优先获取服务端记录的上次消费点, 所以该参数只有在服务端没有对应的消费者的记录的时候有效,一般情况是第一次启动的消费者有效
  2. 三种
    1. CONSUME_FROM_FIRST_OFFSET: 从最开始的位置消费, 会消费该Topic下面所有的有效的数据, 过期的数据会删除掉
    2. CONSUME_FROM_LAST_OFFSET
      1. 如果该Topic的数据都是最近的数据, 没有过期数据, 则从最开始的位置消费
      2. 如果该Topic的数据存在过期的数据, 则从最后的位置开始消费, 只会消费新加入的数据
    3. CONSUME_FROM_TIMESTAMP
      1. 根据指定的时间戳进行消费,配合consumer.setConsumeTimestamp("20200612083300");从指定的时间开始消费, 如果不指定, 则默认从半个小时前的数据开始消费
  3. 代码举例:

     public static void main(String[] args) throws Exception {
         //1 创建一个消费者对象
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("05_model");
         //消息模式设置为集群
         consumer.setMessageModel(MessageModel.CLUSTERING);
         consumer.subscribe("05_model","*");
         // 根据指定的时间去消费
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
         consumer.setConsumeTimestamp("20200623210000");
         //4 注册监听消息的处理
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 for (MessageExt msg : msgs) {
                     System.out.println("消费Consumer001 数据成功:"+new String(msg.getBody())+"msg.getQueueId():"+msg.getQueueId());
                 }
                 // 也会改变消费者的偏移量,  但是数据会放到重试队列
                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
             }
         });
         System.out.println("消费Consumer001启动成功");
         //5 启动消费者
         consumer.start();
     }
    

消息确认

  1. 拉式消费(DefaultLitePullConsumer)
    1. 可以通过consumer.setAutoCommit(false);设置是否自动提交, 如果设置为手动提交, 需要使用consumer.commitSync();方法进行手动提交

       //1 创建一个消费者对象
       DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("05_model_002");
       //2 设置NameServerAddr
       // 生产环境, 测试环境 NameServer 不一样
       //3 先订阅消息主题
       consumer.subscribe("05_model","*");
       //取消自动提交
       consumer.setAutoCommit(false);
       //5 启动消费者
       consumer.start();
       while (true){
           List<MessageExt> msgs = consumer.poll();//拉取间隔时间5s
           System.out.println(msgs);
           //手动提交
           consumer.commitSync();
       }
      
  2. 推式消费(DefaultMQPushConsumer)
    1. 通过返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态表示消费成功
    2. 返回ConsumeConcurrentlyStatus.RECONSUME_LATER 表示消费不成功, 会放入到重试队列
      1. 默认重试采用服务端重试: 重试次数:16次
    3. 代码示例如上面示例
Table of Contents