0%

RabbitMQ-高级特性

消息可靠投递,Consumer ACK,消费者限流(削峰填谷),TTL,DLX死信队列,延迟队列,消息追踪

RabbitMQ-高级特性

消息可靠投递

producer到RabbitMQ

  • producer→exchange

    通过confirmCallback回调函数确认消息的投递

    依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    <dependencies>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.1.7.RELEASE</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>5.1.7.RELEASE</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    </configuration>
    </plugin>
    </plugins>
    </build>

    配置文件

    1
    2
    3
    4
    5
    rabbitmq.host=192.168.3.3
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtual-host=/

    xml配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    https://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory 1. 设置 publisher-confirms="true" -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
    port="${rabbitmq.port}"
    username="${rabbitmq.username}"
    password="${rabbitmq.password}"
    virtual-host="${rabbitmq.virtual-host}"

    publisher-confirms="true"
    />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!--rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--2. 消息可靠性投递(生产端)-->
    <!--队列-->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    <!--交换机-->
    <rabbit:direct-exchange name="test_exchange_confirm">
    <!--交换机绑定队列-->
    <rabbit:bindings>
    <rabbit:binding queue="test_queue_confirm" key="confirm"> </rabbit:binding>
    </rabbit:bindings>
    </rabbit:direct-exchange>

    </beans>

    测试代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * 确认模式:
    * 步骤:
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
    * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
    */
    @Test
    public void testConfirm() {

    //2. 定义回调 **
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    /**
    *
    * @param correlationData 相关配置信息
    * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
    * @param cause 失败原因
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("confirm方法被执行了....");

    if (ack) {
    //接收成功
    System.out.println("接收成功消息" + cause);
    } else {
    //接收失败
    System.out.println("接收失败消息" + cause);
    //做处理,如让消息再次发送。
    }
    }
    });

    //3. 发送消息
    //rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
    rabbitTemplate.convertAndSend("test_exchange_confirm1", "confirm", "message confirm....");
    }
    }
  • exchange→queue

    returnCallback

    1. xml开启配置return-true。增加publisher-returns=”true”

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      <!-- 定义rabbitmq connectionFactory  1. 设置  publisher-confirms="true" -->
      <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
      port="${rabbitmq.port}"
      username="${rabbitmq.username}"
      password="${rabbitmq.password}"
      virtual-host="${rabbitmq.virtual-host}"

      publisher-confirms="true"
      <!-- 开启配置return-true -->
      publisher-returns="true"
      />
    2. producer中编写测试代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
       /**
      * 步骤:
      * 1. 开启回退模式:publisher-returns="true"
      * 2. 设置ReturnCallBack
      * 3. 设置Exchange处理消息的模式:
      * 1. 如果消息没有路由到Queue,则丢弃消息(默认)
      * 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
      */

      @Test
      public void testReturn() {

      //设置交换机处理失败消息的模式
      rabbitTemplate.setMandatory(true);

      //2.设置ReturnCallBack
      rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
      /**
      *
      * @param message 消息对象
      * @param replyCode 错误码
      * @param replyText 错误信息
      * @param exchange 交换机
      * @param routingKey 路由键
      */
      @Override
      public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
      System.out.println("return 执行了....");

      System.out.println(message);
      System.out.println(replyCode);
      System.out.println(replyText);
      System.out.println(exchange);
      System.out.println(routingKey);

      //处理
      }
      });


      //3. 发送消息
      rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
      }

Consumer ACK

RabbitMQ到consumer

消费端收到消息后的确认方式有三种。

  • 自动确认:acknowledge=”none“,当消息被Consumer接收到,则自动确认收到,并将message 从 RabbitMQ 的消息缓存中移除。
  • 手动确认:acknowledge=”manual“,在业务处理完成后,调用channel.basicAck(),手动签收;如果出现异常,则调用channel.basicNack()方法,让其重新发送消息。
  • 根据异常情况确认:acknowledge=”auto
  1. xml中配置手动接收

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    https://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
    port="${rabbitmq.port}"
    username="${rabbitmq.username}"
    password="${rabbitmq.password}"
    virtual-host="${rabbitmq.virtual-host}"/>


    <context:component-scan base-package="com.xiaoruiit.listener" />

    <!--定义监听器容器 添加 acknowledge="manual" 手动-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm">
    </rabbit:listener>
    </rabbit:listener-container>

    </beans>
  2. 监听类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package com.xiaoruiit.listener;

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;

    import java.io.IOException;

    /**
    * Consumer ACK机制:
    * 1. 设置手动签收。acknowledge="manual"
    * 2. 让监听器类实现ChannelAwareMessageListener接口
    * 3. 如果消息成功处理,则调用channel的 basicAck()签收
    * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
    */
    @Component
    public class AckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();

    try {
    //1.接收转换消息
    System.out.println(new String(message.getBody()));

    //2. 处理业务逻辑
    System.out.println("处理业务逻辑...");
    int i = 3/0;//出现错误
    //3. 手动签收
    channel.basicAck(deliveryTag,true);
    } catch (Exception e) {
    //e.printStackTrace();

    //4.拒绝签收
    /*
    第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
    */
    channel.basicNack(deliveryTag,true,true);
    // 了解
    //channel.basicReject(deliveryTag,true);
    }
    }
    }
  3. 编写测试类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
    public class ConsumerTest {

    @Test
    public void test(){
    while (true){

    }
    }
    }

消费者限流

削(xue)峰填谷,RabbitMQ将瞬时的高并发消息慢慢分发给对消息后续处理的系统。

作用:

  • 缓解瞬时压力。

  • 保护处理RabbitMQ消息的系统,防止系统崩溃。高可用

implementation:

  • Consumer需要设置为手动签收

    监听类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

    Thread.sleep(1000);
    //1.获取消息
    System.out.println(new String(message.getBody()));
    //2. 处理业务逻辑
    //3. 签收
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
    }
  • 设置手动签收 acknowledge=”manual”

  • 设置每次处理多少条消息后再次获取消息。perfetch = 1

    1
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >

TTL

解释:当消息到达存活时间后,还没有被消费,会被自动清除。

可设置消息或队列的过期时间。

  • 队列的过期可在xmlqueue标签配置,

  • 消息过期在发送消息时增加过期时间的参数即可。

注意:单条消息和队列都设置了过期时间时,以短的为准。

示例:订单系统发送到RabbitMQ的消息设置为30分钟过期。交易系统30分钟不取走订单消息,则订单关闭。

implementation:

  • 设置队列过期时间:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    <!--设置queue的参数-->
    <rabbit:queue-arguments>
    <!--x-message-ttl指队列的过期时间,注意Integer类型-->
    <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:topic-exchange name="test_exchange_ttl" >
    <rabbit:bindings>
    <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
    </rabbit:bindings>
    </rabbit:topic-exchange>

    测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testTtl() {

    for (int i = 0; i < 10; i++) {
    // 发送消息
    rabbitTemplate.convertAndSend("test_exchange_ttl",
    "ttl.hehe", "message ttl....");
    }
    }
  • 设置单个消息的过期时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Test
    public void testTtl() {
    // 消息后处理对象,设置一些消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
    //1.设置message的信息
    message.getMessageProperties().setExpiration("5000");//消息的过期时间
    //2.返回该消息
    return message;
    }
    };

    //消息单独过期
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
    for (int i = 0; i < 10; i++) {
    if(i == 5){
    //消息过期
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
    }else{
    //不过期的消息
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
    }
    }
    }

DLX 死信队列

死信:

  • TTL过期的消息
  • 未签收的消息,并且没把消息重新放入原队列,requeue=false
  • 超过队列的长度

implementation

  1. 设置正常队列,正常交换机,正常交换机绑定正常队列

    1
    2
    3
    4
    5
    6
    7
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
    <rabbit:bindings>
    <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
    </rabbit:bindings>
    </rabbit:topic-exchange>
  2. 设置死信队列,死信交换机,死信交换机绑定死信队列

    1
    2
    3
    4
    5
    6
    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
    <rabbit:bindings>
    <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
    </rabbit:bindings>
    </rabbit:topic-exchange>
  3. 设置正常队列的死信绑定死信交换机,设置正常队列到死信交换机的路由key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    <!--3. 正常队列绑定死信交换机-->
    <rabbit:queue-arguments>
    <!--3.1 x-dead-letter-exchange:死信交换机名称-->
    <entry key="x-dead-letter-exchange" value="exchange_dlx" />

    <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
    <entry key="x-dead-letter-routing-key" value="dlx.hehe" />

    <!--4.1 设置队列的过期时间 ttl-->
    <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    <!--4.2 设置队列的长度限制 max-length -->
    <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
    </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
    <rabbit:bindings>
    <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
    </rabbit:bindings>
    </rabbit:topic-exchange>
  4. producer 测试方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * 发送测试死信消息:
    * 1. 时间过期
    * 2. 超过长度
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
    //1. 测试时间过期,死信消息
    rabbitTemplate.convertAndSend("test_exchange_dlx",
    "test.dlx.haha","我是一条时间过期的消息");

    //2. 测试超过长度后,消息死信
    for (int i = 0; i < 20; i++) {
    rabbitTemplate.convertAndSend("test_exchange_dlx",
    "test.dlx.haha","我是一条超过长度的消息");
    }

    //3. 测试消息拒收
    rabbitTemplate.convertAndSend("test_exchange_dlx",
    "test.dlx.haha","我是一条消息拒收");

    }

延迟队列

TTL+死信队列

架构图:

需求:订单30分钟未支付,取消订单,并回滚库存。

实现思路:正常队列设置30分钟有效期,30分钟内不做处理。30分钟后转入另一个队列判断订单状态,并处理

implementation

xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定,设置正常队列过期时间30分钟-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

测试代码

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2020");

//2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}

消息追踪

问题:配置写错,producer宕机、consumer宕机,服务器宕机、连接断开可能导致消息不能成功发送。

解决:通过追踪消息可以排查这些问题。

Firehose、rabbitmq_tracing插件都可以实现消息追踪。

Firehose是额外发送一条消息到系统内部的一个交换机(amq.rabbitmq.trace)。

rabbitmq_tracing是额外发送一条消息(更详细)到内部交换机,并将消息记录到mytrace.log文件中。系统内部发送的消息不容易失败。

开启

1
rabbitmq-plugins enable rabbitmq_tracing

测试

1
控制台发送一条消息

log会显示详细信息