消息可靠投递,Consumer ACK,消费者限流(削峰填谷),TTL,DLX死信队列,延迟队列,消息追踪
RabbitMQ-高级特性
消息可靠投递
producer到RabbitMQ
producer→exchange
通过confirmCallback回调函数确认消息的投递
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>配置文件
1
2
3
4
5rabbitmq.host=192.168.3.3
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/xml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory 1. 设置 publisher-confirms="true" -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--2. 消息可靠性投递(生产端)-->
<!--队列-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<!--交换机-->
<rabbit:direct-exchange name="test_exchange_confirm">
<!--交换机绑定队列-->
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"> </rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
//2. 定义回调 **
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做处理,如让消息再次发送。
}
}
});
//3. 发送消息
//rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
rabbitTemplate.convertAndSend("test_exchange_confirm1", "confirm", "message confirm....");
}
}exchange→queue
returnCallback
xml开启配置return-true。增加publisher-returns=”true”
1
2
3
4
5
6
7
8
9
10
11<!-- 定义rabbitmq connectionFactory 1. 设置 publisher-confirms="true" -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
<!-- 开启配置return-true -->
publisher-returns="true"
/>producer中编写测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43/**
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
Consumer ACK
RabbitMQ到consumer
消费端收到消息后的确认方式有三种。
- 自动确认:acknowledge=”none“,当消息被Consumer接收到,则自动确认收到,并将message 从 RabbitMQ 的消息缓存中移除。
- 手动确认:acknowledge=”manual“,在业务处理完成后,调用channel.basicAck(),手动签收;如果出现异常,则调用channel.basicNack()方法,让其重新发送消息。
- 根据异常情况确认:acknowledge=”auto“
xml中配置手动接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.xiaoruiit.listener" />
<!--定义监听器容器 添加 acknowledge="manual" 手动-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm">
</rabbit:listener>
</rabbit:listener-container>
</beans>监听类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package com.xiaoruiit.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
// 了解
//channel.basicReject(deliveryTag,true);
}
}
}编写测试类
1
2
3
4
5
6
7
8
9
10
11@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while (true){
}
}
}
消费者限流
削(xue)峰填谷,RabbitMQ将瞬时的高并发消息慢慢分发给对消息后续处理的系统。
作用:
缓解瞬时压力。
保护处理RabbitMQ消息的系统,防止系统崩溃。高可用
implementation:
Consumer需要设置为手动签收
监听类
1
2
3
4
5
6
7
8
9
10
11
12
13
14@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}设置手动签收 acknowledge=”manual”
设置每次处理多少条消息后再次获取消息。perfetch = 1
1
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
TTL
解释:当消息到达存活时间后,还没有被消费,会被自动清除。
可设置消息或队列的过期时间。
队列的过期可在
xml的queue标签配置,消息过期在发送消息时增加过期时间的参数即可。
注意:单条消息和队列都设置了过期时间时,以短的为准。
示例:订单系统发送到RabbitMQ的消息设置为30分钟过期。交易系统30分钟不取走订单消息,则订单关闭。
implementation:
设置队列过期时间:
1
2
3
4
5
6
7
8
9
10
11
12
13
14<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间,注意Integer类型-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>测试
1
2
3
4
5
6
7
8
9@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl",
"ttl.hehe", "message ttl....");
}
}设置单个消息的过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Test
public void testTtl() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if(i == 5){
//消息过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不过期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
DLX 死信队列
死信:
- TTL过期的消息
- 未签收的消息,并且没把消息重新放入原队列,requeue=false
- 超过队列的长度
implementation
设置正常队列,正常交换机,正常交换机绑定正常队列
1
2
3
4
5
6
7<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>设置死信队列,死信交换机,死信交换机绑定死信队列
1
2
3
4
5
6<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>设置正常队列的死信绑定死信交换机,设置正常队列到死信交换机的路由key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>producer 测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* 发送测试死信消息:
* 1. 时间过期
* 2. 超过长度
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 测试时间过期,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我是一条时间过期的消息");
//2. 测试超过长度后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我是一条超过长度的消息");
}
//3. 测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha","我是一条消息拒收");
}
延迟队列
TTL+死信队列
架构图:
需求:订单30分钟未支付,取消订单,并回滚库存。
实现思路:正常队列设置30分钟有效期,30分钟内不做处理。30分钟后转入另一个队列判断订单状态,并处理
implementation
xml
1 | <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)--> |
测试代码
1 | @Test |
消息追踪
问题:配置写错,producer宕机、consumer宕机,服务器宕机、连接断开可能导致消息不能成功发送。
解决:通过追踪消息可以排查这些问题。
Firehose、rabbitmq_tracing插件都可以实现消息追踪。
Firehose是额外发送一条消息到系统内部的一个交换机(amq.rabbitmq.trace)。
rabbitmq_tracing是额外发送一条消息(更详细)到内部交换机,并将消息记录到mytrace.log文件中。系统内部发送的消息不容易失败。
开启
1 | rabbitmq-plugins enable rabbitmq_tracing |
测试
1 | 控制台发送一条消息 |
log会显示详细信息