Java进阶-RocketMQ进阶

RocketMQ高级消息处理应用

顺序消息

  1. 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序
  2. 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次(就要保证生产者、消费者只有一个)发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
  3. 全局有序 案例
    1. 生产者只有一个、消费者也只有一个(如果有多个,不能保证多个生产者有序发送,不能保证多个消费者有序消费),发送和消费参与的queue只有一个,则是全局有序;
    2. 比如对于MySQL的binLog的日志分发(记录当前数据的所有sql执行命令), 就需要使用全局有序的操作
  4. 局部有序 案例
    1. 一个订单的顺序流程是:创建、支付、发货、完成。订单号相同的消息会被先后发送到同一个队列中;消费时,同一个OrderId获取到的肯定是同一个队列。
    2. 比如:n个订单的流程数据同时发送,每个流程相同的订单号放到一个队列,这样就保证n个队列每个队列里面的流程是有序的,即同一个订单的不同流程(创建、支付、发货、完成)放到一个队列中,因为这些流程产生的顺序一定是有先后顺序的,因此这个订单的顺序放到同一个队列里面也是顺序存储的
  5. 代码举例:(局部有序)
    1. 生产者

       public class Producer {
           public static void main(String[] args) throws Exception {
               DefaultMQProducer producer = new DefaultMQProducer("06_order");
               producer.start();
               String[] tags = new String[]{"TagA", "TagC", "TagD"};
               // 订单列表
               List<OrderStep> orderList = new Producer().buildOrders();
              
               Date date = new Date();
               SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
               String dateStr = sdf.format(date);
               for (int i = 0; i < orderList.size(); i++) {
                   // 加个时间前缀,orderList.get(i) 对象会自动调用tostring转化为字符串
                   String body = dateStr + " Hello RocketMQ " + orderList.get(i);
                   Message msg = new Message("06_order", tags[i % tags.length], "KEY" + i, body.getBytes());
                   //核心思想,可以选择队列
                   //SendResult send(Message var1, MessageQueueSelector var2, Object var3):参数1:具体消息内容 参数2:队列选择器,用于指定当前消息进入哪个队列 参数3:该参数可以传递到队列选择器中,用于进行选择器内部判断
                   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                       @Override
                       //参数1mqs:当前topic下的所有队列集合 参数2 msg:消息内容 参数3 arg:
                       public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                           //这个arg参数就是,下面传的订单id
                           Long id = (Long) arg;  //根据订单id选择发送queue
                           //根据订单id进行队列数取余运算,实现将订单id相同的消息发送到同一个队列,但是不是每一个队列只放一套订单数据,可以放多套,只要保证一套订单的消息放到同一个队列就行
                           //一个队列就相当于是一个管道,消息放在里面只能串行取出,这样就能保证顺序消费
                           long index = id % mqs.size();
                           return mqs.get((int) index);
                       }
                   }, orderList.get(i).getOrderId());//订单id,会传递到回调里面的select方法中作为参数
              
                   System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                           sendResult.getSendStatus(),
                           sendResult.getMessageQueue().getQueueId(),
                           body));
               }
              
               producer.shutdown();
           }
              
           /**
            * 订单的步骤
            */
           private static class OrderStep {
               private long orderId;
               private String desc;
              
               public long getOrderId() {
                   return orderId;
               }
              
               public void setOrderId(long orderId) {
                   this.orderId = orderId;
               }
              
               public String getDesc() {
                   return desc;
               }
              
               public void setDesc(String desc) {
                   this.desc = desc;
               }
              
               @Override
               public String toString() {
                   return "OrderStep{" +
                           "orderId=" + orderId +
                           ", desc='" + desc + '\'' +
                           '}';
               }
           }
              
           /**
            * 生成模拟订单数据
            * 
            */
           private List<OrderStep> buildOrders() {
               List<OrderStep> orderList = new ArrayList<OrderStep>();
              
               OrderStep orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111039L);
               orderDemo.setDesc("创建");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111065L);
               orderDemo.setDesc("创建");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111039L);
               orderDemo.setDesc("付款");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103117235L);
               orderDemo.setDesc("创建");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111065L);
               orderDemo.setDesc("付款");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103117235L);
               orderDemo.setDesc("付款");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111065L);
               orderDemo.setDesc("完成");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111039L);
               orderDemo.setDesc("发货");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103117235L);
               orderDemo.setDesc("完成");
               orderList.add(orderDemo);
              
               orderDemo = new OrderStep();
               orderDemo.setOrderId(15103111039L);
               orderDemo.setDesc("完成");
               orderList.add(orderDemo);
              
               return orderList;
           }
       }
      
    2. 消费者

       public class Consumer {
      
          public static void main(String[] args) throws Exception {
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
              consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
              consumer.subscribe("06_order", "TagA || TagC || TagD");
              //MessageListenerOrderly:顺序监听器,保证每一个队列只有一个线程来消费
              consumer.registerMessageListener(new MessageListenerOrderly() {
                  Random random = new Random();
                  @Override
                  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                      // 设置消息自动提交, 不然就算消费了消息, 下次消费的时候, broker端并不会记录对应的消费位置
                      context.setAutoCommit(true);
                      for (MessageExt msg : msgs) {
                          // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                          System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                      }
                      try {
                          //模拟业务逻辑处理中...
                          TimeUnit.SECONDS.sleep(random.nextInt(10));
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                      return ConsumeOrderlyStatus.SUCCESS;
                  }
              });
              
              consumer.start();
              
              System.out.println("Consumer Started.");
          }
       }
      

延时消息

  1. 生产者发送的消息,消费者会延迟消费
  2. 原理:生产者先将消息发送到延迟队列中,一旦延迟时间到,消息再从延迟队列发送到正常队列中,消费者即时消费
  3. 应用场景
    1. 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
  4. 使用限制
    1. 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
    2. 如果想修改,则在源码中org/apache/rocketmq/store/config/MessageStoreConfig.java

       private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
      
    3. 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
  5. 代码举例
    1. 消费者

       public class Consumer {
          public static void main(String[] args) throws Exception {
             // 实例化消费者
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay");
             // 订阅Topics
             consumer.subscribe("07_delay", "*");
             //设置消费的线程最大数,不设置,默认为20
             //consumer.setConsumeThreadMax();
             // 注册消息监听者
             // MessageListenerConcurrently 这里使用的是并发监听器,多个线程可以消费一个队列的数据
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // Print approximate delay time period
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // 启动消费者
             consumer.start();
              System.out.println("消费者启动成功");
         }
       }
      
    2. 生产者

       public class Producer {
          public static void main(String[] args) throws Exception {
             // 实例化一个生产者来产生延时消息
             DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
             // 启动生产者
             producer.start();
             int totalMessagesToSend = 100;
             for (int i = 0; i < totalMessagesToSend; i++) {
                 Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
                 // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
                 message.setDelayTimeLevel(3);
                 // 发送消息
                 producer.send(message);
             }
              // 关闭生产者
             producer.shutdown();
         }
       }
      

过滤消息

  1. Tag消息过滤
    1. 消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用

       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
       consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");  
      
  2. SQL消息过滤
    1. 可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。
    2. 基本语法
      1. RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

         - 数值比较,比如: >,>=,<,<=,BETWEEN,=; 
         - 字符比较,比如: =,<>,IN; 
         -  IS NULL  或者  IS NOT NULL; 
         - 逻辑符号  AND,OR,NOT; 
                    
         常量支持类型为:
                    
         - 数值,比如: 123,3.1415; 
         - 字符,比如: 'abc',必须用单引号包裹起来; 
         -  NULL ,特殊的常量
         - 布尔值, TRUE  或  FALSE 
        
      2. 只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

         public void subscribe(finalString topic, final MessageSelector messageSelector)
        
    3. 使用样例
      1. 配置参数
        1. 默认情况下是不支持属性过滤的, 需要通过配置参数开启:enablePropertyFilter=true
        2. 在配置文件中配置对应的参数(conf/broker.conf)

           brokerClusterName = DefaultCluster
           brokerName = broker-a
           namesrvAddr=192.168.48.102:9876
           brokerId = 0
           deleteWhen = 04
           fileReservedTime = 48
           brokerRole = ASYNC_MASTER
           flushDiskType = ASYNC_FLUSH
           enablePropertyFilter=true
          
          1. nohup bin/mqbroker -c conf/broker.conf & 通过-c 指定配置文件路径
      2. 生产者
        1. 发送消息时,你能通过putUserProperty来设置消息的属性
         public class Producer {
             public static void main(String[] args) throws Exception {
                 // 实例化一个生产者来产生过滤消息
                 DefaultMQProducer producer = new DefaultMQProducer("08_filter");
                 // 启动生产者
                 producer.start();
                 Message msg = new Message("08_filter", "TagA", "9527", "hello, rokcetmq 007".getBytes());
                 msg.putUserProperty("age", "28");
                 msg.putUserProperty("score", "70");
                 producer.send(msg);
                 // 关闭生产者
                 producer.shutdown();
             }
         }
        
      3. 消费者

         public class Consumer {
            public static void main(String[] args) throws Exception {
               // 实例化消费者
               DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("08_filter");
               // 订阅Topics
               consumer.subscribe("08_filter", MessageSelector.bySql(" age < 30 and score > 80"));
               // 注册消息监听者
               consumer.registerMessageListener(new MessageListenerConcurrently() {
                   @Override
                   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                       for (MessageExt message : messages) {
                           // Print approximate delay time period
                           System.out.println(message.getProperties());
                           System.out.println("Receive message[msgId=" + message.getMsgId() + "] :"+new String(message.getBody()));
                       }
                       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   }
               });
               // 启动消费者
               consumer.start();
                System.out.println("消费者启动成功");
           }
         }
        

批量发送消息(了解)

  1. 批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
  2. 使用场景:单个消息非常小,为了节省网络消耗(没发送一条消息都要消耗网络)。

     List<Message> messages = new ArrayList<>();
     //将消息包装到一个集合中
     for (int i = 0; i < 5; i++) {
         Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes());
         messages.add(msg);
     }
     //批量发送
     producer.sendOneway(messages);
    

SpringBoot集成使用

  1. 新建2个springboot项目
    1. 14-rocketmq-spboot-producer: File-new Module->Spring Initializr ->next ->group:com.zh.demo Artifact:14-rocketmq-spboot-producer-》next-》勾选Lombok、Spring web-》next
    2. 同理创建14-rocketmq-spboot-consumer项目
  2. 导入依赖(2个都导入)

     <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-boot-starter</artifactId>
         <version>2.1.0</version>
     </dependency>
    
  3. 生产者
    1. 添加配置参数

       //nameserver地址
       rocketmq.name-server=192.168.48.102:9876
       //生产者组
       rocketmq.producer.group=my_group
       //当前服务端口
       server.port=9999
      
    2. 代码

       @RestController
       public class HelloController {
           @Autowired
           private RocketMQTemplate  rocketMQTemplate;
           private String topic="01_boot_hello";
           // 需求: 把接收的msg消息发送到mq的topic中
           @RequestMapping("01_hello")
           public String hello(String msg) throws Exception{
               System.out.println("msg = " + msg);
               //发送消息
               SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
               return sendResult.getSendStatus().toString();
           }
       }
      
  4. 消费者
    1. 添加配置参数

       rocketmq.name-server=127.0.0.1:9876
       server.port=7777
      
    2. 消费者

       //消费者
       //1 必须是容器组件
       @Component
       //类上要加上@RocketMQMessageListener注解
       @RocketMQMessageListener(
           //当前的消费组名称
           consumerGroup = "test-9657",
           //要监听的topic
           topic = "01_boot_hello",
           //集群模式
           messageModel = MessageModel.CLUSTERING
       )
       public class HelloConsumer implements RocketMQListener<MessageExt> {
           //回调方法
           @Override
           public void onMessage(MessageExt ext) {
               System.out.println("接收到的消息: message = " + new String(ext.getBody()));
               System.out.println("ext = " + JSON.toJSONString(ext));
           }
       }
      
  5. 注意:
    1. 实际开发建议:一个应用最好只有一个Topic,对于消息类型可以使用Tag标记进行区分
    2. 一个项目中可以有多个消费者进行监听,但是每个消费者的组名必须唯一

       //当前的消费组名称
       consumerGroup = "test-9657",
      

生产消息类型

  1. 同步消息、异步消息、一次性消息

     @RestController
     public class TypeController {
        
         @Autowired
         private RocketMQTemplate rocketMQTemplate;
        
         private String topic="02_boot_type";
        
         @RequestMapping("02_type")
         public String sendMsg(String msg){
             //1 同步发送
             SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
             // 2 异步发送
             rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
                 @Override
                 public void onSuccess(SendResult sendResult) {
                     System.out.println("异步消息发送成功");
                 }
                 @Override
                 public void onException(Throwable throwable) {
                     System.out.println("异步消息发送失败");
                 }
             });
             //3 一次性消息
             rocketMQTemplate.sendOneWay(topic,msg);
             return "success";
         }
     }
    

拉式消费

  1. 在推式消费基础上,消息监听实现RocketMQPushConsumerLifecycleListener ,重写prepareStart方法

     @Component
     @RocketMQMessageListener(
         consumerGroup = "test-9657",
         topic = "01_boot_hello",
         messageModel = MessageModel.CLUSTERING
     )
     /*
      * RocketMQPushConsumerLifecycleListener:当@RocketMQMessageListener中的配置不⾜以满⾜我们的需求时,可以实现该接⼝直接更改消费者类DefaultMQPushConsumer的配置
      */
     public class HelloConsumer3 implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
         //回调方法
         @Override
         public void onMessage(MessageExt ext) {
         }
         @Override
         public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
             //设置每次消息拉取的时间间隔 单位 毫秒
             defaultMQPushConsumer.setPullInterval(1000);
             //最小消费线程池数
             defaultMQPushConsumer.setConsumeThreadMin(5);
             //最大消费线程池数
             defaultMQPushConsumer.setConsumeThreadMax(15);
             //每次消费(即将多条消息合并为List消费)的最大消息数目,消费线程每次消费的最大消息的数量,默认值为1
             defaultMQPushConsumer.setConsumeMessageBatchMaxSize(5);
             //重复消费次数,用于失败后重试,默认-1
             defaultMQPushConsumer.setMaxReconsumeTimes(8);
             //设置每个队列每次拉取的最大消费数,即拉取线程每次从broker拉取的消息量
             defaultMQPushConsumer.setPullBatchSize(10);
             defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                     System.out.println("接收消息线程: "+Thread.currentThread().getName());
                     for (MessageExt msg : msgs) {
                         System.out.println("接收到的消息:"+new String(msg.getBody()));
                     }
                     //确定消息消费成功
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
         }
     }
    
  2. pullBatchSize的大小受制于broker配置文件中 maxTransferCountOnMessageInMemory 参数的设置,该参数默认设置为 32,也即是每次从服务端拉取的最大的数量不能超过32,因此即使设置 pullBatchSize超过32,最后也只返回32。因此,若要每次拉取的消息量超过32,可以修改broker配置文件里该参数的值,并重启broker服务。
  3. 每个消费线程消费批次消费的消息数量,consumeMessageBatchMaxSize 也跟pullBatchSize相关,消费线程实际上每次消费的消息数量不会大于 pullBatchSize,具体可以查看ConsumeMessageConcurrentlyService.submitConsumeRequest(),消费任务的提交。
  4. 因此,实际上 consumeMessageBatchMaxSize <= pullBatchSize <= maxTransferCountOnMessageInMemory

消费模式

  1. 集群模式

     @Component
     @RocketMQMessageListener(
             consumerGroup = "02_cluster_model",
             topic = "02_cluster_model",
             messageModel = MessageModel.CLUSTERING
     )
     public class ClusterModelConsumer implements RocketMQListener<String> {
         @Override
         public void onMessage(String message) {
             System.out.println("集群模式消费消息: "+message);
         }
     }
    
  2. 广播模式

     //对于广播模式, 实时消费消息, 在广播模式消费者启动之前的消息, 是接收不到
     // 2 对于广播模式, 发送失败的消息不会重试
     @Component
     @RocketMQMessageListener(
             consumerGroup = "02_broadcasting_model",
             topic = "02_broadcasting_model",
             messageModel = MessageModel.BROADCASTING
     )
     public class BrandCastModelConsumer implements RocketMQListener<String> {
         @Override
         public void onMessage(String message) {
             System.out.println("广播模式消费消息: "+message);
         }
     }
    

延时消息

@RestController
public class DelayController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private String topic="03_boot_delay";
    // 需求: 把接收的msg消息发送到mq的topic中
    @RequestMapping("03_delay")
    public String sendMsg(String msg) throws Exception {
       //设置延迟消息
        //方式一: 直接使用原生的api
        DefaultMQProducer producer = rocketMQTemplate.getProducer();
        Message message = new Message(topic, "TagA", "9527", msg.getBytes());
        message.setDelayTimeLevel(3);
        //在实际工作中, 确保消息可靠性, 捕获对应的异常
        producer.send(message);
        //方式二: 找可以使用延迟级别的API
        org.springframework.messaging.Message<String> msg2 = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend(topic,msg2,3000,3);
        return "success";
    }
}

设置消息标签

//使用功能tag对消息进行分类
@RestController
public class TagController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private String topic="04_boot_tag";
    private String tag="TagA";
    @RequestMapping("04_tag")
    public String sendMsg(String msg){
        rocketMQTemplate.syncSend(topic+":"+tag,msg);
        return "Success";
    }
}

设置消息的Key

@RestController
public class KeyController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private String topic="05_boot_key";
    private String tag="TagA";
    private String key="9627";
    @RequestMapping("05_key")
    public String sendMsg(String msg){
        org.springframework.messaging.Message<String> msg2 = MessageBuilder.withPayload(msg)
                .setHeader(MessageConst.PROPERTY_KEYS,key).build();
        rocketMQTemplate.syncSend(topic,msg2);
        return "Success";
    }
}

自定义属性设置

@RestController
public class PropsController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private String topic="06_boot_props";
    private String tag="TagA";
    private String key="9627";
    @RequestMapping("05_key")
    public String sendMsg(String msg){
        HashMap map = new HashMap<>();
        //通过Map添加自定义属性
        map.put("name","rose");
        rocketMQTemplate.convertAndSend(topic,msg,map);
        return "Success";
    }
}

消息过滤

@Component
@RocketMQMessageListener(
        consumerGroup = "02_cluster_model",
        topic = "02_cluster_model",
        messageModel = MessageModel.CLUSTERING,
        //tag过滤
//        selectorType = SelectorType.TAG;
//        selectorExpression =  "TagA";
        //sql过滤
        selectorType = SelectorType.SQL92,
        selectorExpression = " name ='rose' or age>18 "
)
public class FilterConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("集群模式消费消息: "+message);
    }
}
  1. 过滤设置:需要开启broker的支持用户属性配置 enablePropertyFilter=true

发送消息的方式

  1. 直接使用rocketMQTemplate
  2. 使用DefaultMQProducer对象
  3. 使用Spring的Message接口
Table of Contents