Java进阶-RocketMQ进阶
RocketMQ高级消息处理应用
顺序消息
- 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
- 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次(就要保证生产者、消费者只有一个)发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
- 全局有序 案例
- 生产者只有一个、消费者也只有一个(如果有多个,不能保证多个生产者有序发送,不能保证多个消费者有序消费),发送和消费参与的queue只有一个,则是全局有序;
- 比如对于MySQL的binLog的日志分发(记录当前数据的所有sql执行命令), 就需要使用全局有序的操作
- 局部有序 案例
- 一个订单的顺序流程是:创建、支付、发货、完成。订单号相同的消息会被先后发送到同一个队列中;消费时,同一个OrderId获取到的肯定是同一个队列。
- 比如:n个订单的流程数据同时发送,每个流程相同的订单号放到一个队列,这样就保证n个队列每个队列里面的流程是有序的,即同一个订单的不同流程(创建、支付、发货、完成)放到一个队列中,因为这些流程产生的顺序一定是有先后顺序的,因此这个订单的顺序放到同一个队列里面也是顺序存储的
- 代码举例:(局部有序)
-
生产者
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("06_order"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 订单列表 List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < orderList.size(); i++) { // 加个时间前缀,orderList.get(i) 对象会自动调用tostring转化为字符串 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("06_order", tags[i % tags.length], "KEY" + i, body.getBytes()); //核心思想,可以选择队列 //SendResult send(Message var1, MessageQueueSelector var2, Object var3):参数1:具体消息内容 参数2:队列选择器,用于指定当前消息进入哪个队列 参数3:该参数可以传递到队列选择器中,用于进行选择器内部判断 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override //参数1mqs:当前topic下的所有队列集合 参数2 msg:消息内容 参数3 arg: public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //这个arg参数就是,下面传的订单id Long id = (Long) arg; //根据订单id选择发送queue //根据订单id进行队列数取余运算,实现将订单id相同的消息发送到同一个队列,但是不是每一个队列只放一套订单数据,可以放多套,只要保证一套订单的消息放到同一个队列就行 //一个队列就相当于是一个管道,消息放在里面只能串行取出,这样就能保证顺序消费 long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//订单id,会传递到回调里面的select方法中作为参数 System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 订单的步骤 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模拟订单数据 * */ private List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("发货"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
-
消费者
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("06_order", "TagA || TagC || TagD"); //MessageListenerOrderly:顺序监听器,保证每一个队列只有一个线程来消费 consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置消息自动提交, 不然就算消费了消息, 下次消费的时候, broker端并不会记录对应的消费位置 context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
-
延时消息
- 生产者发送的消息,消费者会延迟消费
- 原理:生产者先将消息发送到延迟队列中,一旦延迟时间到,消息再从延迟队列发送到正常队列中,消费者即时消费
- 应用场景
- 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
- 使用限制
- 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
-
如果想修改,则在源码中
org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码
SendMessageProcessor.java
- 代码举例
-
消费者
public class Consumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay"); // 订阅Topics consumer.subscribe("07_delay", "*"); //设置消费的线程最大数,不设置,默认为20 //consumer.setConsumeThreadMax(); // 注册消息监听者 // MessageListenerConcurrently 这里使用的是并发监听器,多个线程可以消费一个队列的数据 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.println("消费者启动成功"); } }
-
生产者
public class Producer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // 启动生产者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(3); // 发送消息 producer.send(message); } // 关闭生产者 producer.shutdown(); } }
-
过滤消息
- Tag消息过滤
-
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
-
- SQL消息过滤
- 可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。
- 基本语法
-
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如: >,>=,<,<=,BETWEEN,=; - 字符比较,比如: =,<>,IN; - IS NULL 或者 IS NOT NULL; - 逻辑符号 AND,OR,NOT; 常量支持类型为: - 数值,比如: 123,3.1415; - 字符,比如: 'abc',必须用单引号包裹起来; - NULL ,特殊的常量 - 布尔值, TRUE 或 FALSE
-
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
-
- 使用样例
- 配置参数
- 默认情况下是不支持属性过滤的, 需要通过配置参数开启:
enablePropertyFilter=true
-
在配置文件中配置对应的参数(conf/broker.conf)
brokerClusterName = DefaultCluster brokerName = broker-a namesrvAddr=192.168.48.102:9876 brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH enablePropertyFilter=true
nohup bin/mqbroker -c conf/broker.conf &
通过-c 指定配置文件路径
- 默认情况下是不支持属性过滤的, 需要通过配置参数开启:
- 生产者
- 发送消息时,你能通过putUserProperty来设置消息的属性
public class Producer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生过滤消息 DefaultMQProducer producer = new DefaultMQProducer("08_filter"); // 启动生产者 producer.start(); Message msg = new Message("08_filter", "TagA", "9527", "hello, rokcetmq 007".getBytes()); msg.putUserProperty("age", "28"); msg.putUserProperty("score", "70"); producer.send(msg); // 关闭生产者 producer.shutdown(); } }
-
消费者
public class Consumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("08_filter"); // 订阅Topics consumer.subscribe("08_filter", MessageSelector.bySql(" age < 30 and score > 80")); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println(message.getProperties()); System.out.println("Receive message[msgId=" + message.getMsgId() + "] :"+new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.println("消费者启动成功"); } }
- 配置参数
批量发送消息(了解)
- 批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
-
使用场景:单个消息非常小,为了节省网络消耗(没发送一条消息都要消耗网络)。
List<Message> messages = new ArrayList<>(); //将消息包装到一个集合中 for (int i = 0; i < 5; i++) { Message msg = new Message("03_oneway","TagB",i+"","hello, NBA".getBytes()); messages.add(msg); } //批量发送 producer.sendOneway(messages);
SpringBoot集成使用
- 新建2个springboot项目
- 14-rocketmq-spboot-producer: File-new Module->Spring Initializr ->next ->group:com.zh.demo Artifact:14-rocketmq-spboot-producer-》next-》勾选Lombok、Spring web-》next
- 同理创建14-rocketmq-spboot-consumer项目
-
导入依赖(2个都导入)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
- 生产者
-
添加配置参数
//nameserver地址 rocketmq.name-server=192.168.48.102:9876 //生产者组 rocketmq.producer.group=my_group //当前服务端口 server.port=9999
-
代码
@RestController public class HelloController { @Autowired private RocketMQTemplate rocketMQTemplate; private String topic="01_boot_hello"; // 需求: 把接收的msg消息发送到mq的topic中 @RequestMapping("01_hello") public String hello(String msg) throws Exception{ System.out.println("msg = " + msg); //发送消息 SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); return sendResult.getSendStatus().toString(); } }
-
- 消费者
-
添加配置参数
rocketmq.name-server=127.0.0.1:9876 server.port=7777
-
消费者
//消费者 //1 必须是容器组件 @Component //类上要加上@RocketMQMessageListener注解 @RocketMQMessageListener( //当前的消费组名称 consumerGroup = "test-9657", //要监听的topic topic = "01_boot_hello", //集群模式 messageModel = MessageModel.CLUSTERING ) public class HelloConsumer implements RocketMQListener<MessageExt> { //回调方法 @Override public void onMessage(MessageExt ext) { System.out.println("接收到的消息: message = " + new String(ext.getBody())); System.out.println("ext = " + JSON.toJSONString(ext)); } }
-
- 注意:
- 实际开发建议:一个应用最好只有一个Topic,对于消息类型可以使用Tag标记进行区分
-
一个项目中可以有多个消费者进行监听,但是每个消费者的组名必须唯一
//当前的消费组名称 consumerGroup = "test-9657",
生产消息类型
-
同步消息、异步消息、一次性消息
@RestController public class TypeController { @Autowired private RocketMQTemplate rocketMQTemplate; private String topic="02_boot_type"; @RequestMapping("02_type") public String sendMsg(String msg){ //1 同步发送 SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); // 2 异步发送 rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("异步消息发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("异步消息发送失败"); } }); //3 一次性消息 rocketMQTemplate.sendOneWay(topic,msg); return "success"; } }
拉式消费
-
在推式消费基础上,消息监听实现RocketMQPushConsumerLifecycleListener ,重写prepareStart方法
@Component @RocketMQMessageListener( consumerGroup = "test-9657", topic = "01_boot_hello", messageModel = MessageModel.CLUSTERING ) /* * RocketMQPushConsumerLifecycleListener:当@RocketMQMessageListener中的配置不⾜以满⾜我们的需求时,可以实现该接⼝直接更改消费者类DefaultMQPushConsumer的配置 */ public class HelloConsumer3 implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { //回调方法 @Override public void onMessage(MessageExt ext) { } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { //设置每次消息拉取的时间间隔 单位 毫秒 defaultMQPushConsumer.setPullInterval(1000); //最小消费线程池数 defaultMQPushConsumer.setConsumeThreadMin(5); //最大消费线程池数 defaultMQPushConsumer.setConsumeThreadMax(15); //每次消费(即将多条消息合并为List消费)的最大消息数目,消费线程每次消费的最大消息的数量,默认值为1 defaultMQPushConsumer.setConsumeMessageBatchMaxSize(5); //重复消费次数,用于失败后重试,默认-1 defaultMQPushConsumer.setMaxReconsumeTimes(8); //设置每个队列每次拉取的最大消费数,即拉取线程每次从broker拉取的消息量 defaultMQPushConsumer.setPullBatchSize(10); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("接收消息线程: "+Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.println("接收到的消息:"+new String(msg.getBody())); } //确定消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } }
- pullBatchSize的大小受制于broker配置文件中 maxTransferCountOnMessageInMemory 参数的设置,该参数默认设置为 32,也即是每次从服务端拉取的最大的数量不能超过32,因此即使设置 pullBatchSize超过32,最后也只返回32。因此,若要每次拉取的消息量超过32,可以修改broker配置文件里该参数的值,并重启broker服务。
- 每个消费线程消费批次消费的消息数量,consumeMessageBatchMaxSize 也跟pullBatchSize相关,消费线程实际上每次消费的消息数量不会大于 pullBatchSize,具体可以查看ConsumeMessageConcurrentlyService.submitConsumeRequest(),消费任务的提交。
- 因此,实际上 consumeMessageBatchMaxSize <= pullBatchSize <= maxTransferCountOnMessageInMemory
消费模式
-
集群模式
@Component @RocketMQMessageListener( consumerGroup = "02_cluster_model", topic = "02_cluster_model", messageModel = MessageModel.CLUSTERING ) public class ClusterModelConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("集群模式消费消息: "+message); } }
-
广播模式
//对于广播模式, 实时消费消息, 在广播模式消费者启动之前的消息, 是接收不到 // 2 对于广播模式, 发送失败的消息不会重试 @Component @RocketMQMessageListener( consumerGroup = "02_broadcasting_model", topic = "02_broadcasting_model", messageModel = MessageModel.BROADCASTING ) public class BrandCastModelConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("广播模式消费消息: "+message); } }
延时消息
@RestController
public class DelayController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private String topic="03_boot_delay";
// 需求: 把接收的msg消息发送到mq的topic中
@RequestMapping("03_delay")
public String sendMsg(String msg) throws Exception {
//设置延迟消息
//方式一: 直接使用原生的api
DefaultMQProducer producer = rocketMQTemplate.getProducer();
Message message = new Message(topic, "TagA", "9527", msg.getBytes());
message.setDelayTimeLevel(3);
//在实际工作中, 确保消息可靠性, 捕获对应的异常
producer.send(message);
//方式二: 找可以使用延迟级别的API
org.springframework.messaging.Message<String> msg2 = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic,msg2,3000,3);
return "success";
}
}
设置消息标签
//使用功能tag对消息进行分类
@RestController
public class TagController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private String topic="04_boot_tag";
private String tag="TagA";
@RequestMapping("04_tag")
public String sendMsg(String msg){
rocketMQTemplate.syncSend(topic+":"+tag,msg);
return "Success";
}
}
设置消息的Key
@RestController
public class KeyController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private String topic="05_boot_key";
private String tag="TagA";
private String key="9627";
@RequestMapping("05_key")
public String sendMsg(String msg){
org.springframework.messaging.Message<String> msg2 = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_KEYS,key).build();
rocketMQTemplate.syncSend(topic,msg2);
return "Success";
}
}
自定义属性设置
@RestController
public class PropsController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private String topic="06_boot_props";
private String tag="TagA";
private String key="9627";
@RequestMapping("05_key")
public String sendMsg(String msg){
HashMap map = new HashMap<>();
//通过Map添加自定义属性
map.put("name","rose");
rocketMQTemplate.convertAndSend(topic,msg,map);
return "Success";
}
}
消息过滤
@Component
@RocketMQMessageListener(
consumerGroup = "02_cluster_model",
topic = "02_cluster_model",
messageModel = MessageModel.CLUSTERING,
//tag过滤
// selectorType = SelectorType.TAG;
// selectorExpression = "TagA";
//sql过滤
selectorType = SelectorType.SQL92,
selectorExpression = " name ='rose' or age>18 "
)
public class FilterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("集群模式消费消息: "+message);
}
}
- 过滤设置:需要开启broker的支持用户属性配置
enablePropertyFilter=true
发送消息的方式
- 直接使用rocketMQTemplate
- 使用DefaultMQProducer对象
- 使用Spring的Message接口