第13章-RabbitMQ.md 55 KB

第13章-RabbitMQ

学习目标:

  • 能够说出Rabbitmq应用场景
  • 能够说出Rabbitmq消息不丢失解决方案
  • 掌握Rabbitmq实现普通消息的收发
  • 掌握Rabbitmq实现延迟消息
  • 基于Rabbitmq消息队列实现商品数据同步
  • 基于Rabbitmq消息队列实现订单延迟关闭

1、目前存在的问题

1.1 搜索与商品服务的问题

我们思考一下,是否存在问题?

  • 商品的原始数据保存在数据库中,增删改查都在数据库中完成

  • 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新

如果我们在后台修改了商品的价格,搜索页面依然是旧的价格,这样显然不对。该如何解决?

这里有两种解决方案:

  • 方案1:每当后台对商品做增删改操作,同时要修改索引库数据

  • 方案2:搜索服务对外提供操作接口,后台在商品增删改后,调用接口

以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

所以,我们会通过另外一种方式来解决这个问题:消息队列

1.2 订单服务取消订单问题

用户下单后,如果1个小时未支付,我们该如何取消订单

  • 方案1:定时任务,定时扫描未支付订单,超过2小时自动关闭

  • 方案2:使用延迟队列关闭订单

1.3 分布式事务问题

如:用户支付订单,我们如何保证更新订单状态与扣减库存 ,三个服务数据最终一致!

2、消息队列解决什么问题

消息队列都解决了什么问题?

2.1 异步

img

2.2 解耦

img

2.3 并行

img

2.4 排队

img

3、消息队列工具 RabbitMQ

3.1 常见MQ产品

  • ActiveMQ:基于JMS(Java Message Service)协议,java语言,jdk

  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

  • Kafka:分布式消息系统,高吞吐量

3.2 RabbitMQ基础概念

img

Broker:简单来说就是消息队列服务器实体

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列(不存消息)

Queue:消息队列载体,每个消息都会被投入到一个或多个队列

Binding:绑定,它的作用就是把 exchange和 queue按照路由规则绑定起来

Routing Key:路由关键字, exchange根据这个关键字进行消息投递

vhost:虚拟主机,一个 broker里可以开设多个 vhost,用作不同用户的权限分离

producer:消息生产者,就是投递消息的程序

consumer:消息消费者,就是接受消息的程序

channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel代表一个会话任务

3.3 安装RabbitMQ

看电商软件环境安装.doc

访问IP地址:http://192.168.200.128:15672

  • 用户名:admin

  • 密 码:admin

3.4 五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

img

  • 基本消息模型:生产者–>队列–>消费者

  • work消息模型:生产者–>队列–>多个消费者竞争消费

  • 订阅模型-Fanout:广播模式,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息

  • 订阅模型-Direct:定向,把消息交给符合指定 rotingKey 的队列

  • 订阅模型-Topic 主题模式:通配符,把消息交给符合routing pattern(路由模式) 的队列

我们项目使用的是第四种!

3.5 搭建mq测试环境service-mq

3.5.1 搭建service-mq服务

gmall_service模块下新建模块:service-mq 作用:类似某个业务微服务,发送消息(代码可以复用-抽取到Rabbit-util模块),监听消息(每个业务模块不同)

image-20221208224406411

3.5.2 启动类

package com.atguigu.gmall;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源自动配置
@EnableDiscoveryClient
public class MqDemoApp {

   public static void main(String[] args) {
      SpringApplication.run(MqDemoApp.class, args);
   }
}

3.5.3 添加配置文件

resources目录下新建配置文件:bootstrap.properties

spring.application.name=service-mq
spring.profiles.active=dev
spring.cloud.nacos.discovery.server-addr=192.168.200.128:8848
spring.cloud.nacos.config.server-addr=192.168.200.128:8848
spring.cloud.nacos.config.prefix=${spring.application.name}
spring.cloud.nacos.config.file-extension=yaml
spring.cloud.nacos.config.shared-configs[0].data-id=common.yaml

Nacos配置中心提供common.yaml配置文件,说明:rabbitmq默认端口5672

4、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:

  1. 生产者不丢数据

  2. MQ服务器不丢数据

  3. 消费者不丢数据

保证消息不丢失有两种实现方式:

  • 开启事务模式

  • 消息息确认模式(生产者,消费者)

说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

4.1 消息确认

4.1.1 消息持久化

如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化

Exchange

声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)

Queue

声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)

message

发送消息时通过设置deliveryMode=2持久化消息

4.1.2 发送确认

有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

4.1.3 手动消费确认

有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?

要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

4.2 消息确认业务封装

4.2.1 service-mq修改配置

开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:
  rabbitmq:
    host: 192.168.200.128
    port: 5672
    username: admin
    password: admin
    publisher-confirm-type: correlated  #交换机的确认 异步回调ConfirmCallback
    publisher-returns: true  # 队列的确认 异步回调ReturnCallback
    listener:
      simple:
        acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

4.2.2 搭建rabbit-util模块

由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可

  1. gmall-common模块下新增模块:rabbit-util 。搭建方式如common-util

image-20221208225353770

  1. pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <parent>
           <artifactId>gmall-common</artifactId>
           <groupId>com.atguigu.gmall</groupId>
           <version>1.0</version>
       </parent>
       <modelVersion>4.0.0</modelVersion>
       
       <artifactId>rabbit-util</artifactId>
       
       <dependencies>
           <!--AMQP依赖,包含RabbitMQ-->
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
       </dependencies>
    </project>
    
  2. rabbit-util中提供常量类 MqConst

    package com.atguigu.gmall.common.rabbit.config;
       
    public class MqConst {
       /**
        * 消息补偿
        */
       public static final String MQ_KEY_PREFIX = "mq:list";
       public static final int RETRY_COUNT = 3;
       /**
        * 商品上下架
        */
       public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";
       public static final String ROUTING_GOODS_UPPER = "goods.upper";
       public static final String ROUTING_GOODS_LOWER = "goods.lower";
       //队列
       public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";
       public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";
       /**
        * 取消订单,发送延迟队列
        */
       public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
       public static final String ROUTING_ORDER_CANCEL = "order.create";
       //延迟取消订单队列
       public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";
       //取消订单 延迟时间 单位:秒 真实业务
       public static final int DELAY_TIME  = 24*60*60;
       //  测试取消订单
       // public static final int DELAY_TIME  = 3;
       /**
        * 订单支付
        */
       public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";
       public static final String ROUTING_PAYMENT_PAY = "payment.pay";
       //队列
       public static final String QUEUE_PAYMENT_PAY  = "queue.payment.pay";
       
       /**
        * 减库存
        */
       public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";
       public static final String ROUTING_WARE_STOCK = "ware.stock";
       //队列
       public static final String QUEUE_WARE_STOCK  = "queue.ware.stock";
       /**
        * 减库存成功,更新订单状态
        */
       public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";
       public static final String ROUTING_WARE_ORDER = "ware.order";
       //队列
       public static final String QUEUE_WARE_ORDER  = "queue.ware.order";
       
       /**
        * 关闭交易
        */
       public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";
       public static final String ROUTING_PAYMENT_CLOSE = "payment.close";
       //队列
       public static final String QUEUE_PAYMENT_CLOSE  = "queue.payment.close";
       /**
        * 定时任务
        */
       public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";
       public static final String ROUTING_TASK_1 = "seckill.task.1";
       //队列
       public static final String QUEUE_TASK_1  = "queue.task.1";
       /**
        * 秒杀
        */
       public static final String EXCHANGE_DIRECT_SECKILL_USER = "exchange.direct.seckill.user";
       public static final String ROUTING_SECKILL_USER = "seckill.user";
       //队列
       public static final String QUEUE_SECKILL_USER  = "queue.seckill.user";
       
       /**
        * 定时任务
        */
       
       public static final String ROUTING_TASK_18 = "seckill.task.18";
       //队列
       public static final String QUEUE_TASK_18  = "queue.task.18";
       
       
       
       /**
        * 秒杀减库存
        */
       public static final String EXCHANGE_DIRECT_SECKILL_STOCK = "exchange.direct.seckill.stock";
       public static final String ROUTING_SECKILL_STOCK = "seckill.stock";
       //队列
       public static final String QUEUE_SECKILL_STOCK  = "queue.seckill.stock";
    }
    

4.2.3 生产者消息确认机制

rabbit-util中添加类

package com.atguigu.gmall.rabbit.config;

import com.atguigu.gmall.rabbit.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * 对生产者使用模板对象进行设置生产者回调
 * 1.在配置文件中开启生产者确认
 * publisher-confirm-type: correlated  #设置交换机异步回调
 * publisher-returns: true            #开启队列异常回调
 * 2.在代码中对模板对象RabbitTemplate设置两个回调方法
 * 交换机回调(包含正常,异常情况)
 * 队列异常回调(交换机路由消息失败)
 *
 * @author: atguigu
 * @create: 2023-08-11 14:08
 */
@Slf4j
@Configuration
public class MQProducerConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 项目启动后会自动执行该方法
     */
    @PostConstruct
    public void init() {
        log.info("项目启动后,进行了RabbitTemplate模板对象设置回调");
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }


    /**
     * 交换机确认:当生产者发送消息正常到达交换机,一级交换机未收到消息,都会执行该回调方法
     *
     * @param correlationData 相关数据 当生产者发送时设置“相关数据”才会有值
     * @param ack             true:消息正常发送交换机  false:消息发送到交换机失败
     * @param cause           当消息发送交换机失败,包含失败信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("[生产者-交换机正常确认]发送消息到交换机成功");
        } else {
            //当消息发送交换机失败,进行重试
            log.error("[生产者-交换机异常确认]发送消息到交换机失败:{}", cause);
        }
    }

    /**
     * 队列确认:只有当交换机路由消息到队列失败,才会走队列确认回调方法
     *
     * @param message    回调消息对象
     * @param replyCode  应答码
     * @param replyText  应答文本.
     * @param exchange   交换机名称.
     * @param routingKey 路由键.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("[生产者-队列异常确认],消息路由queue失败,应答码={},原因={},交换机={},路由键={},消息={},进行生产者消息重发", replyCode, replyText, exchange, routingKey, message.toString());
    }
}

4.2.4 封装消息发送

rabbit-util 中添加类

package com.atguigu.gmall.rabbit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 所有业务模块需要发送消息工具类
 *
 * @author: atguigu
 * @create: 2023-09-16 15:27
 */
@Component
public class RabbitService {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    /***
     * 发送普通消息方法
     * @param exchange
     * @param routingKey
     * @param msg
     */
    public void sendMessage(String exchange, String routingKey, Object msg) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

4.2.5 发送确认消息测试

service-mq引入rabbit-util模块依赖

<dependencies>
    <dependency>
        <groupId>com.atguigu.gmall</groupId>
        <artifactId>rabbit-util</artifactId>
        <version>1.0</version>
    </dependency>
</dependencies>

service-mq编写测试代码

消息发送端

package com.atguigu.gmall.mq.controller;

import com.atguigu.gmall.common.result.Result;
import com.atguigu.gmall.rabbit.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: atguigu
 * @create: 2023-09-16 15:32
 */
@RestController
@RequestMapping("/api/mq")
@Slf4j
public class ProducerController {


    @Autowired
    private RabbitService rabbitService;

    /**
     * 生产者发送消息
     *
     * @param data
     * @return
     */
    @GetMapping("/sendNormalMsg")
    public Result sendNormalMsg(@RequestParam("data") String data) {
        String exchange = "exchange.confirm";
        String routingKey = "routing.confirm";
        rabbitService.sendMessage(exchange, routingKey, data);
        log.info("发送消息成功");
        return Result.ok();
    }
}

消息接收端

在service-mq 中编写

package com.atguigu.gmall.mq.receiver;

import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * @author: atguigu
 * @create: 2023-09-16 15:41
 */
@Slf4j
@Component
public class NormalMsgReceiver {


    /**
     * @param data
     * @param channel 信道
     * @param message 消息对象
     * @RabbitListener:如果交换机跟队列已创建且完成绑定,只需要设置监听队列名称即可 queues = ""
     * @RabbitListener:如果交换机跟队列已都没有,扫描到指定交换机,队列,自动创建且完成绑定 消费者:负责监听消息
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("exchange.confirm"),
            value = @Queue("queue.confirm"),
            key = "routing.confirm"
    ))
    public void receiverMessage(String data, Channel channel, Message message) {
        if (StringUtils.isNotBlank(data)) {
            log.info("[消费者]监听到普通消息:{}", data);
            System.out.println("模拟处理XXX业务");
        }
        //默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
        //消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
        //basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        //basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

        //basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

}

测试:http://localhost:8282/api/mq/sendNormalMsg

4.2.6 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制

  1. rabbit-util 模块中添加依赖

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
  2. rabbit-util模块中自定义一个实体类来接收消息

    package com.atguigu.gmall.common.model;
       
    import lombok.Data;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
       
    @Data
    public class GmallCorrelationData extends CorrelationData {
       
       //  默认有一个消息唯一表示的Id   private volatile String id;
       //  消息主体
       private Object message;
       //  交换机
       private String exchange;
       //  路由键
       private String routingKey;
       //  重试次数
       private int retryCount = 0;
       //  消息类型  是否是延迟消息
       private boolean isDelay = false;
       //  延迟时间
       private int delayTime = 10;
    }
    
  3. 修改rabbit-utilRabbitService中发送方法:sendMessage() 修改发送方法

    package com.atguigu.gmall.rabbit.service;
       
    import com.alibaba.fastjson.JSON;
    import com.atguigu.gmall.rabbit.model.GmallCorrelationData;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
       
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
       
    /**
    * 所有业务模块需要发送消息工具类
    *
    * @author: atguigu
    * @create: 2023-09-16 15:27
    */
    @Component
    public class RabbitService {
       
       
       @Autowired
       private RabbitTemplate rabbitTemplate;
       
       @Autowired
       private RedisTemplate redisTemplate;
       
       /***
        * 发送普通消息方法
        * @param exchange
        * @param routingKey
        * @param msg
        */
       public void sendMessage(String exchange, String routingKey, Object msg) {
           //1.发送消息同时设置相关数据
           GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
           String id = UUID.randomUUID().toString().replaceAll("-", "");
           gmallCorrelationData.setId(id);
           gmallCorrelationData.setExchange(exchange);
           gmallCorrelationData.setRoutingKey(routingKey);
           gmallCorrelationData.setMessage(msg);
       
           //2.将相关数据存入Redis,设置5分钟有效期
           String key = "mq:" + id;
           String correlationDataStr = JSON.toJSONString(gmallCorrelationData);
           redisTemplate.opsForValue().set(key, correlationDataStr, 5, TimeUnit.MINUTES);
       
           //3.执行发送消息-设置相关数据
           rabbitTemplate.convertAndSend(exchange, routingKey, msg, gmallCorrelationData);
       
       }
    }
    
  4. 发送失败调用重发方法 MQProducerAckConfig 类中修改confirm方法

    package com.atguigu.gmall.rabbit.config;
       
    import com.alibaba.fastjson.JSON;
    import com.atguigu.gmall.rabbit.model.GmallCorrelationData;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
       
    import javax.annotation.PostConstruct;
    import java.awt.*;
    import java.util.concurrent.TimeUnit;
       
    /**
    * MQProducerConfig类产生对象:既是ConfirmCallback接口类型 又是ReturnCallback接口类型
    *
    * @author: atguigu
    * @create: 2023-09-18 09:15
    */
    @Slf4j
    @Component
    public class MQProducerConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
       
       @Autowired
       private RabbitTemplate rabbitTemplate;
       
       @Autowired
       private RedisTemplate redisTemplate;
       
       
       /**
        * 项目启动成功后(RabbitTemplate对象在IOC容器中)自动执行一次
        */
       @PostConstruct
       public void init() {
           //设置交换机确认回调
           rabbitTemplate.setConfirmCallback(this);
           //设置队列确认异常回调
           rabbitTemplate.setReturnCallback(this);
           log.info("[RabbitMQ]生产者,项目启动后执行生产者确认设置");
       
       }
       
       
       /**
        * 生产者确认回调-交换机回调(正常,异常)
        *
        * @param correlationData 相关数据,只有当调用模板对象发送业务消息同时设置相关数据
        * @param ack             true:正常, false:异常
        * @param cause           异常原因
        */
       @Override
       public void confirm(CorrelationData correlationData, boolean ack, String cause) {
           if (!ack) {
               log.error("交换机异常回调,原因:{},进入到重试逻辑。", cause);
               if (correlationData != null && StringUtils.isNotBlank(correlationData.getId())) {
                   this.retryMessage(correlationData.getId());
               }
           }
       }
       
       
       /**
        * 生产者确认回调-队列确认异常。只要交换机路由消息到队列失败就会触发该方法回调
        *
        * @param message    MQ消息对象.
        * @param replyCode  异常码.
        * @param replyText  异常消息.
        * @param exchange   交换机名称.
        * @param routingKey 路由键名称.
        */
       @Override
       public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
           log.error("[生产者-队列异常确认],消息路由queue失败,应答码={},原因={},交换机={},路由键={},MQ消息={},进入到重试逻辑.", replyCode, replyText, exchange, routingKey, message.toString());
           //1.从MQ消息对象中属性信息中获取头信息 spring_returned_message_correlation -> 25fc60a2aa2d4981a6aaa140a0be5ec6
           String id = message.getMessageProperties().getHeader("spring_returned_message_correlation");
           if (StringUtils.isNotBlank(id)) {
               //2.进行重试
               this.retryMessage(id);
           }
       }
       
       
       /**
        * 当生产者发送消息异常,在人工介入前程序自动进行重试
        *
        * @param id 相关数据标识,根据该标识获取存在Redis中相关数据信息
        */
       private void retryMessage(String id) {
           //1.构建相关数据Key
           String key = "mq:" + id;
       
           //2.查询Redis中存放相关数据-注意该数据存采用Strin    g
           String correlationDataStr = (String) redisTemplate.opsForValue().get(key);
           GmallCorrelationData gmallCorrelationData = JSON.parseObject(correlationDataStr, GmallCorrelationData.class);
       
           //3.判断重试是否超限 阈值:3次
           int retryCount = gmallCorrelationData.getRetryCount();
           if (retryCount > 2) {
               //4.如果重试超限,写入生产者消息异常表(将来由人工处理)
               log.error("[RabbitMQ]消息重发超限:相关数据:{}", correlationDataStr);
               return;
           }
           //5.如果重试未超限,再次发送业务消息+相关数据  更新重试次数
           //5.1 更新重试次数
           gmallCorrelationData.setRetryCount(retryCount + 1);
           log.info("正在执行第:{}次重发消息。", retryCount + 1);
           Long ttl = redisTemplate.getExpire(key);
           redisTemplate.opsForValue().set(key, JSON.toJSONString(gmallCorrelationData), ttl, TimeUnit.SECONDS);
       
           //5.2 将发送失败消息再次进行重发
           rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), gmallCorrelationData);
       }
    }
    

测试: 修改路由键或交换机 -- 完美!

4.3 改造商品搜索上下架

4.3.1 service-list与service-product引入依赖与配置

分别在商品微服务搜索微服务模块导入以下依赖

<!--rabbitmq消息队列-->
<dependency>
   <groupId>com.atguigu.gmall</groupId>
   <artifactId>rabbit-util</artifactId>
   <version>1.0</version>
</dependency>

4.3.2 service-product发送消息

我在商品上架与商品添加时发送消息

商品上架业务实现类SkuManageServiceImpl

@Autowired
private RabbitService rabbitService;


/**
 * 上架商品
 *
 * @param skuId
 */
@Override
public void onSale(Long skuId) {
    //1.修改商品表中商品状态
    SkuInfo skuInfo = new SkuInfo(skuId);
    skuInfo.setIsSale(1);
    skuInfoService.updateById(skuInfo);

    //2.将上架商品skuId存入布隆过滤器
    RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
    bloomFilter.add(skuId);

    //2.基于MQ异步消息通知搜索服务新增索引库文档
    rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);
}

商品下架

/**
 * 下架商品
 *
 * @param skuId
 */
@Override
public void cancelSale(Long skuId) {
    //1.修改商品表中商品状态  update sku_info set is_sale=? where id = ?
    LambdaUpdateWrapper<SkuInfo> updateWrapper = new LambdaUpdateWrapper<>();
    updateWrapper.set(SkuInfo::getIsSale, 0);
    updateWrapper.eq(SkuInfo::getId, skuId);
    skuInfoService.update(updateWrapper);


    //2.基于MQ异步消息通知搜索服务删除索引库文档
    rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_LOWER, skuId);
}

4.3.4 service-list消费消息

package com.atguigu.gmall.list.receiver;

import com.atguigu.gmall.list.service.SearchService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;

import java.io.IOException;

/**
 * 搜索服务监听器类
 *
 * @author: atguigu
 * @create: 2023-09-18 11:36
 */
@Slf4j
@Component
public class ListReceiver {


    @Autowired
    private SearchService searchService;

    /**
     * 监听商品上架消息,构建商品索引库文档对象并写入索引库
     * 每个消费者都需要考虑是否要做幂等性处理
     * @param skuId
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"),
            key = MqConst.ROUTING_GOODS_UPPER
    ))
    public void processsGoodsUpper(Long skuId, Channel channel, Message message) {
        //1.执行业务
        if (skuId != null) {
            log.info("[搜索服务]商品上架监听:{}", skuId);
            searchService.upperGoods(skuId);
        }
        //2.执行手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    /**
     * 监听商品下架消息,删除索引库文档
     *
     * @param skuId
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
            key = MqConst.ROUTING_GOODS_LOWER
    ))
    public void processsGoodsLower(Long skuId, Channel channel, Message message) {
        //1.执行业务
        if (skuId != null) {
            log.info("[搜索服务]商品下架监听:{}", skuId);
            searchService.lowerGoods(skuId);
        }
        //2.执行手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

4.3.5 测试

启动后台管理页面

http://localhost:8888/#/product/sku/list

操作商品的上架,下架。动态更改es中的数据。

可以通过http://192.168.200.128:5601/app/kibana#/dev_tools/console?_g=() 观察功能是否实现!

5、延迟消息

延迟消息有两种实现方案:

1,基于死信队列

2,集成延迟插件

5.1 基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列

5.1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

如何设置TTL:

我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

5.1.2 死信交换机 Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

(2)上面的消息的TTL到了,消息过期了。

(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

img

我们现在可以测试一下延迟队列。

(1)创建死信队列

(2)创建交换机

(3)建立交换器与队列之间的绑定

(4)创建队列

5.1.3 代码实现

5.1.3.1 在service-mq 中添加配置类

package com.atguigu.gmall.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 基于死信实现延迟消息:注册交换机跟队列完成绑定
 * 配置类不用记,直接拷贝
 *
 * @author: atguigu
 * @create: 2023-09-18 14:27
 */
@Configuration
public class DeadLetterConfig {


    public static final String exchange_dead = "exchange.dead";
    public static final String routing_dead_1 = "routing.dead.1";
    public static final String routing_dead_2 = "routing.dead.2";
    public static final String queue_dead_1 = "queue.dead.1";
    public static final String queue_dead_2 = "queue.dead.2";


    /**
     * 注册交换机对象
     *
     * @return
     */
    @Bean
    public DirectExchange exchange() {

        //p1:交换机名称 p2:是否持久化 p3:自动删除(删除条件:当没有队列跟交换机绑定) p4:交换机额外参数
        return new DirectExchange(exchange_dead, true, false);
    }

    /**
     * 注册队列对象1:设置队列TTL;设置超时后发送消息指定交换机名称跟路由键名称
     */
    @Bean
    public Queue queue1() {
        //1.只对首次声明它的连接(Connection)可见 2.会在其连接断开的时候自动删除
        //p1:队列名称 p2:是否持久化 p3:排他性队列 p4:自动删除(删除条件:当没有交换机跟队列绑定) p5:额外参数
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl", 20000);
        args.put("x-dead-letter-exchange", exchange_dead);
        args.put("x-dead-letter-routing-key", routing_dead_2);
        return new Queue(queue_dead_1, true, false, false, args);
    }

    /**
     * 进行交换机跟队列1绑定
     */
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
    }

    //注册队列对象2
    @Bean
    public Queue queue2(){
        //1.只对首次声明它的连接(Connection)可见 2.会在其连接断开的时候自动删除
        //p1:队列名称 p2:是否持久化 p3:排他性队列 p4:自动删除(删除条件:当没有交换机跟队列绑定) p5:额外参数
        return new Queue(queue_dead_2, true, false, false);
    }

    //进行交换机跟队列2绑定
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
    }

}

5.1.3.2 配置发送消息

service-mq模块 ProducerController


/**
 * 基于死信实现延迟消息
 *
 * @param data
 * @return
 */
@GetMapping("/sendDeadLetterMsg")
public Result sendDeadLetterMsg(@RequestParam("data") String data) {
    rabbitService.sendMessage(DeadLetterConfig.exchange_dead, DeadLetterConfig.routing_dead_1, data);
    log.info("基于死信-发送延迟消息成功");
    return Result.ok();
}

5.1.3.3 消息接收方

service-mq模块

package com.atguigu.gmall.mq.receiver;

import com.atguigu.gmall.mq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * @author: atguigu
 * @create: 2023-09-16 15:41
 */
@Slf4j
@Component
public class DeadLetterMsgReceiver {


    /**
     * @param data
     * @param channel 信道
     * @param message 消息对象
     */
    @SneakyThrows
    @RabbitListener(queues = DeadLetterConfig.queue_dead_2)
    public void receiverMessage(String data, Channel channel, Message message) {
        if (StringUtils.isNotBlank(data)) {
            log.info("[消费者]监听到基于死信实现的延迟消息:{}", data);
        }
        //默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
        //消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
        //basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        //basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

        //basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

}

5.2 基于延迟插件实现延迟消息

Rabbitmq实现了一个插件x-delay-message来实现延时队列

5.2.1 插件安装

  1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html

  2. 切换到插件所在目录,执行命令,将刚插件拷贝到容器内plugins目录下

    docker cp rabbitmq_delayed_message_exchange-3.9.0.ez gmalldocker_rabbitmq_1:/plugins
    
  3. 执行 docker exec -it gmalldocker_rabbitmq_1 /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录

  4. 执行 ls -l|grep delay 命令查看插件是否copy成功

  5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件

  6. exit命令退出RabbitMQ容器内部,然后执行 docker restart gmalldocker_rabbitmq_1 命令重启RabbitMQ容器

5.2.2 代码实现

service-mq 中添加类,配置队列

  package com.atguigu.gmall.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedMqConfig {

    public static final String exchange_delay = "exchange.delay";
    public static final String routing_delay = "routing.delay";
    public static final String queue_delay_1 = "queue.delay.1";

    @Bean
    public Queue delayQeue1() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(queue_delay_1, true);
    }

    /**
     * 声明延迟类型交换机
     * 前提:必须要保证MQ配置开启延迟插件
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBbinding1() {
        return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
    }
}

ProducerController:发送消息

/**
 * 基于延迟插件实现延迟消息
 *
 * @param data
 * @return
 */
@GetMapping("/sendDelayMsg")
public Result sendDelayMsg(@RequestParam("data") String data, @RequestParam("delayTime") int delayTime) {
    rabbitService.sendDelayMessage(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, data, delayTime);
    log.info("基于延迟插件-发送延迟消息成功");
    return Result.ok();
}

rabbit-utilRabbitService中封装发送延迟消息方法,队列确认方法中增加延迟队列判断

/***
 * 发送延迟消息方法
 * @param exchange
 * @param routingKey
 * @param msg
 * @param dealyTime 延迟时间,单位设置为:秒
 */
public void sendDelayMessage(String exchange, String routingKey, Object msg, Integer dealyTime) {
    //1.发送消息同时设置相关数据
    GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
    String id = UUID.randomUUID().toString().replaceAll("-", "");
    gmallCorrelationData.setId(id);
    gmallCorrelationData.setExchange(exchange);
    gmallCorrelationData.setRoutingKey(routingKey);
    gmallCorrelationData.setMessage(msg);
    gmallCorrelationData.setDelay(true);
    gmallCorrelationData.setDelayTime(dealyTime);

    //2.将相关数据存入Redis,设置5分钟有效期
    String key = "mq:" + id;
    String correlationDataStr = JSON.toJSONString(gmallCorrelationData);
    redisTemplate.opsForValue().set(key, correlationDataStr, 5, TimeUnit.MINUTES);

    //3.执行发送消息-设置相关数据
    rabbitTemplate.convertAndSend(exchange, routingKey, msg, (message -> {
        //设置消息过期时间
        message.getMessageProperties().setDelay(dealyTime * 1000);
        return message;
    }), gmallCorrelationData);

}

MQProducerAckConfig队列确认增加延迟消息判断

/**
 * 当生产者发送消息异常,在人工介入前程序自动进行重试
 *
 * @param id 相关数据标识,根据该标识获取存在Redis中相关数据信息
 */
private void retryMessage(String id) {
    //1.构建相关数据Key
    String key = "mq:" + id;

    //2.查询Redis中存放相关数据-注意该数据存采用Strin    g
    String correlationDataStr = (String) redisTemplate.opsForValue().get(key);
    GmallCorrelationData gmallCorrelationData = JSON.parseObject(correlationDataStr, GmallCorrelationData.class);

    //重发对于延迟消息处理方式一:忽略延迟消息
    if (gmallCorrelationData.isDelay()) {
        return;
    }
    //3.判断重试是否超限 阈值:3次
    int retryCount = gmallCorrelationData.getRetryCount();
    if (retryCount > 2) {
        //4.如果重试超限,写入生产者消息异常表(将来由人工处理)
        log.error("[RabbitMQ]消息重发超限:相关数据:{}", correlationDataStr);
        return;
    }
    //5.如果重试未超限,再次发送业务消息+相关数据  更新重试次数
    //5.1 更新重试次数
    gmallCorrelationData.setRetryCount(retryCount + 1);
    log.info("正在执行第:{}次重发消息。", retryCount + 1);
    Long ttl = redisTemplate.getExpire(key);
    redisTemplate.opsForValue().set(key, JSON.toJSONString(gmallCorrelationData), ttl, TimeUnit.SECONDS);

    //5.2 将发送失败消息再次进行重发
    if (gmallCorrelationData.isDelay()) {
        //延迟消息重发
        rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), message -> {
            message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime() * 1000);
            return message;
        }, gmallCorrelationData);
    } else {
        //普通消息重发
        rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), gmallCorrelationData);
    }
}

接收消息,消费者端判断是否需要做幂等性处理

package com.atguigu.gmall.mq.receiver;

import com.atguigu.gmall.mq.config.DeadLetterConfig;
import com.atguigu.gmall.mq.config.DelayedMqConfig;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;


/**
 * @author: atguigu
 * @create: 2023-09-16 15:41
 */
@Slf4j
@Component
public class DelayMsgReceiver {


    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * @param data
     * @param channel 信道
     * @param message 消息对象
     */
    @SneakyThrows
    @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
    public void receiverMessage(String data, Channel channel, Message message) {
        if (StringUtils.isNotBlank(data)) {
            log.info("[消费者]监听到基于延迟插件实现的延迟消息:{}", data);

            //判断当前消费者业务端是否需要满足幂等性 ,只处理第一次消息 1.利用MySQL唯一约束 2.利用Redis set nx
            //1.获取消息中唯一业务标识,例如订单id,编号。保证消息幂等性
            String key = "mq:" + data;

            //2.将业务标识存入Redis 设置过期时间
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, data, 5, TimeUnit.MINUTES);
            if (!flag) {
                //该项业务数据已被执行
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            log.info("业务执行...................");
        }
        //默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
        //消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
        //basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        //basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

        //basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

}

消费结果会发送三次,也被消费三次!

如何保证消息幂等性?

  1. 使用数据库方式

  2. 使用redis setnx 命令解决(推荐)

5.3 基于延迟插件实现取消订单

service-order模块

5.3.1 业务配置与接口封装

rabbit-util模块已配置常量MqConst

/**
 * 取消订单,发送延迟队列
 */
public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
public static final String ROUTING_ORDER_CANCEL = "order.create";
//延迟取消订单队列
public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";
//取消订单 延迟时间 单位:秒
public static final int DELAY_TIME  = 10;

5.3.2 改造订单service-order模块

service-order模块添加依赖

<!--rabbitmq工具模块-->
<dependency>
    <groupId>com.atguigu.gmall</groupId>
    <artifactId>rabbit-util</artifactId>
    <version>1.0</version>
</dependency>

配置队列

package com.atguigu.gmall.order.config;

import com.atguigu.gmall.rabbit.config.MqConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 创建取消订单延迟关闭订单需要交换机跟队列完成绑定
 */
@Configuration
public class CancelOrderBindingConfig {

    @Bean
    public Queue cancelOrderQueue() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(MqConst.QUEUE_ORDER_CANCEL, true);
    }

    /**
     * 声明延迟类型交换机
     * 前提:必须要保证MQ配置开启延迟插件
     * @return
     */
    @Bean
    public CustomExchange cancelOrderExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBbinding1() {
        return BindingBuilder.bind(cancelOrderQueue()).to(cancelOrderExchange()).with(MqConst.ROUTING_ORDER_CANCEL).noargs();
    }
}

5.3.3 发送消息

创建订单时,发送延迟消息

修改保存订单方法submitOrder

@Autowired
private RabbitService rabbitService;


/**
 * 提交保存订单
 *
 * @param orderInfo
 * @param tradeNo
 * @return
 */
@Override
public Long submitOrder(OrderInfo orderInfo, String tradeNo) {

    //6.TODO 商城订单采用延迟消息方式-检查订单&关闭订单
    //TODO 测试设置为20秒延迟 真实延迟时间跟订单设置过期时间 动态计算出来
    Date expireTime = orderInfo.getExpireTime();
    Date now = new Date();
    Long ttl = (expireTime.getTime() - now.getTime())/1000;
    rabbitService.sendDelayMessage(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, MqConst.ROUTING_ORDER_CANCEL, orderId, ttl.intValue());

    //TODO 订单提交后购物车数据清理,开发阶段不清理
    return orderId;
}

5.3.4 接收消息

package com.atguigu.gmall.order.reciever;

import com.atguigu.gmall.order.service.OrderInfoService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: atguigu
 * @create: 2023-09-18 16:00
 */
@Slf4j
@Component
public class OrderReceiver {


    @Autowired
    private OrderInfoService orderInfoService;


    /**
     * 延迟关闭订单消费者
     *
     * @param orderId 订单ID
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(queues = MqConst.QUEUE_ORDER_CANCEL)
    public void processCloseOrder(Long orderId, Channel channel, Message message) {
        //1.业务处理 可以不处理幂等
        if (orderId != null) {
            log.info("[订单服务]监听到延迟关单消息:{}", orderId);
            orderInfoService.execCloseOrder(orderId);
        }

        //2.手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5.3.5 编写取消订单接口与实现类

OrderInfoService

/**
 * 验证订单支付状态且关闭订单
 *
 * @param orderId
 */
void execCloseOrder(Long orderId);


/**
 * 将指定订单改为指定状态
 *
 * @param orderId
 */
void updateOrderStatus(Long orderId, ProcessStatus processStatus);

OrderInfoServiceImpl

/**
 * 根据订单ID查询订单支付状态,如果订单状态为:未支付,将其改为关闭
 *
 * @param orderId
 */
@Override
@Transactional(rollbackFor = Exception.class)
public void execCloseOrder(Long orderId) {
    //1.根据订单ID查询订单信息
    OrderInfo orderInfo = this.getById(orderId);

    //2.判断订单状态
    if (orderId != null && PaymentStatus.UNPAID.name().equals(orderInfo.getOrderStatus())) {
        this.updateOrderStatus(orderId, ProcessStatus.CLOSED);
    }
}


/**
 * 根据订单ID将订单改为指定状态
 *
 * @param orderId
 * @param processStatus
 */
@Override
public void updateOrderStatus(Long orderId, ProcessStatus processStatus) {
    OrderInfo orderInfo = new OrderInfo();
    orderInfo.setId(orderId);
    orderInfo.setOrderStatus(processStatus.getOrderStatus().name());
    orderInfo.setProcessStatus(processStatus.name());
    this.updateById(orderInfo);
}