学习目标:
我们思考一下,是否存在问题?
商品的原始数据保存在数据库中,增删改查都在数据库中完成
搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新
如果我们在后台修改了商品的价格,搜索页面依然是旧的价格,这样显然不对。该如何解决?
这里有两种解决方案:
方案1:每当后台对商品做增删改操作,同时要修改索引库数据
方案2:搜索服务对外提供操作接口,后台在商品增删改后,调用接口
以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。
所以,我们会通过另外一种方式来解决这个问题:消息队列
用户下单后,如果1个小时未支付,我们该如何取消订单
方案1:定时任务,定时扫描未支付订单,超过2小时自动关闭
方案2:使用延迟队列关闭订单
如:用户支付订单,我们如何保证更新订单状态与扣减库存 ,三个服务数据最终一致!
消息队列都解决了什么问题?
ActiveMQ:基于JMS(Java Message Service)协议,java语言,jdk
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
Kafka:分布式消息系统,高吞吐量
Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列(不存消息)
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把 exchange和 queue按照路由规则绑定起来
Routing Key:路由关键字, exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个 broker里可以开设多个 vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel代表一个会话任务
看电商软件环境安装.doc
访问IP地址:http://192.168.200.128:15672
用户名:admin
密 码:admin
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
基本消息模型:生产者–>队列–>消费者
work消息模型:生产者–>队列–>多个消费者竞争消费
订阅模型-Fanout:广播模式,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
订阅模型-Direct:定向,把消息交给符合指定 rotingKey 的队列
订阅模型-Topic 主题模式:通配符,把消息交给符合routing pattern(路由模式) 的队列
我们项目使用的是第四种!
在gmall_service
模块下新建模块:service-mq 作用:类似某个业务微服务,发送消息(代码可以复用-抽取到Rabbit-util模块),监听消息(每个业务模块不同)
package com.atguigu.gmall;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源自动配置
@EnableDiscoveryClient
public class MqDemoApp {
public static void main(String[] args) {
SpringApplication.run(MqDemoApp.class, args);
}
}
在resources
目录下新建配置文件:bootstrap.properties
spring.application.name=service-mq
spring.profiles.active=dev
spring.cloud.nacos.discovery.server-addr=192.168.200.128:8848
spring.cloud.nacos.config.server-addr=192.168.200.128:8848
spring.cloud.nacos.config.prefix=${spring.application.name}
spring.cloud.nacos.config.file-extension=yaml
spring.cloud.nacos.config.shared-configs[0].data-id=common.yaml
Nacos配置中心提供common.yaml
配置文件,说明:rabbitmq默认端口5672
消息的不丢失,在MQ角度考虑,一般有三种途径:
生产者不丢数据
MQ服务器不丢数据
消费者不丢数据
保证消息不丢失有两种实现方式:
开启事务模式
消息息确认模式(生产者,消费者)
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!
spring:
rabbitmq:
host: 192.168.200.128
port: 5672
username: admin
password: admin
publisher-confirm-type: correlated #交换机的确认 异步回调ConfirmCallback
publisher-returns: true # 队列的确认 异步回调ReturnCallback
listener:
simple:
acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
gmall-common
模块下新增模块:rabbit-util 。搭建方式如common-utilpom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gmall-common</artifactId>
<groupId>com.atguigu.gmall</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-util</artifactId>
<dependencies>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
在rabbit-util
中提供常量类 MqConst
package com.atguigu.gmall.common.rabbit.config;
public class MqConst {
/**
* 消息补偿
*/
public static final String MQ_KEY_PREFIX = "mq:list";
public static final int RETRY_COUNT = 3;
/**
* 商品上下架
*/
public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";
public static final String ROUTING_GOODS_UPPER = "goods.upper";
public static final String ROUTING_GOODS_LOWER = "goods.lower";
//队列
public static final String QUEUE_GOODS_UPPER = "queue.goods.upper";
public static final String QUEUE_GOODS_LOWER = "queue.goods.lower";
/**
* 取消订单,发送延迟队列
*/
public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
public static final String ROUTING_ORDER_CANCEL = "order.create";
//延迟取消订单队列
public static final String QUEUE_ORDER_CANCEL = "queue.order.cancel";
//取消订单 延迟时间 单位:秒 真实业务
public static final int DELAY_TIME = 24*60*60;
// 测试取消订单
// public static final int DELAY_TIME = 3;
/**
* 订单支付
*/
public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";
public static final String ROUTING_PAYMENT_PAY = "payment.pay";
//队列
public static final String QUEUE_PAYMENT_PAY = "queue.payment.pay";
/**
* 减库存
*/
public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";
public static final String ROUTING_WARE_STOCK = "ware.stock";
//队列
public static final String QUEUE_WARE_STOCK = "queue.ware.stock";
/**
* 减库存成功,更新订单状态
*/
public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";
public static final String ROUTING_WARE_ORDER = "ware.order";
//队列
public static final String QUEUE_WARE_ORDER = "queue.ware.order";
/**
* 关闭交易
*/
public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";
public static final String ROUTING_PAYMENT_CLOSE = "payment.close";
//队列
public static final String QUEUE_PAYMENT_CLOSE = "queue.payment.close";
/**
* 定时任务
*/
public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";
public static final String ROUTING_TASK_1 = "seckill.task.1";
//队列
public static final String QUEUE_TASK_1 = "queue.task.1";
/**
* 秒杀
*/
public static final String EXCHANGE_DIRECT_SECKILL_USER = "exchange.direct.seckill.user";
public static final String ROUTING_SECKILL_USER = "seckill.user";
//队列
public static final String QUEUE_SECKILL_USER = "queue.seckill.user";
/**
* 定时任务
*/
public static final String ROUTING_TASK_18 = "seckill.task.18";
//队列
public static final String QUEUE_TASK_18 = "queue.task.18";
/**
* 秒杀减库存
*/
public static final String EXCHANGE_DIRECT_SECKILL_STOCK = "exchange.direct.seckill.stock";
public static final String ROUTING_SECKILL_STOCK = "seckill.stock";
//队列
public static final String QUEUE_SECKILL_STOCK = "queue.seckill.stock";
}
在rabbit-util
中添加类
package com.atguigu.gmall.rabbit.config;
import com.atguigu.gmall.rabbit.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* 对生产者使用模板对象进行设置生产者回调
* 1.在配置文件中开启生产者确认
* publisher-confirm-type: correlated #设置交换机异步回调
* publisher-returns: true #开启队列异常回调
* 2.在代码中对模板对象RabbitTemplate设置两个回调方法
* 交换机回调(包含正常,异常情况)
* 队列异常回调(交换机路由消息失败)
*
* @author: atguigu
* @create: 2023-08-11 14:08
*/
@Slf4j
@Configuration
public class MQProducerConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 项目启动后会自动执行该方法
*/
@PostConstruct
public void init() {
log.info("项目启动后,进行了RabbitTemplate模板对象设置回调");
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确认:当生产者发送消息正常到达交换机,一级交换机未收到消息,都会执行该回调方法
*
* @param correlationData 相关数据 当生产者发送时设置“相关数据”才会有值
* @param ack true:消息正常发送交换机 false:消息发送到交换机失败
* @param cause 当消息发送交换机失败,包含失败信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("[生产者-交换机正常确认]发送消息到交换机成功");
} else {
//当消息发送交换机失败,进行重试
log.error("[生产者-交换机异常确认]发送消息到交换机失败:{}", cause);
}
}
/**
* 队列确认:只有当交换机路由消息到队列失败,才会走队列确认回调方法
*
* @param message 回调消息对象
* @param replyCode 应答码
* @param replyText 应答文本.
* @param exchange 交换机名称.
* @param routingKey 路由键.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("[生产者-队列异常确认],消息路由queue失败,应答码={},原因={},交换机={},路由键={},消息={},进行生产者消息重发", replyCode, replyText, exchange, routingKey, message.toString());
}
}
在rabbit-util
中添加类
package com.atguigu.gmall.rabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 所有业务模块需要发送消息工具类
*
* @author: atguigu
* @create: 2023-09-16 15:27
*/
@Component
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/***
* 发送普通消息方法
* @param exchange
* @param routingKey
* @param msg
*/
public void sendMessage(String exchange, String routingKey, Object msg) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
}
在service-mq
引入rabbit-util
模块依赖
<dependencies>
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
在service-mq
编写测试代码
消息发送端
package com.atguigu.gmall.mq.controller;
import com.atguigu.gmall.common.result.Result;
import com.atguigu.gmall.rabbit.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: atguigu
* @create: 2023-09-16 15:32
*/
@RestController
@RequestMapping("/api/mq")
@Slf4j
public class ProducerController {
@Autowired
private RabbitService rabbitService;
/**
* 生产者发送消息
*
* @param data
* @return
*/
@GetMapping("/sendNormalMsg")
public Result sendNormalMsg(@RequestParam("data") String data) {
String exchange = "exchange.confirm";
String routingKey = "routing.confirm";
rabbitService.sendMessage(exchange, routingKey, data);
log.info("发送消息成功");
return Result.ok();
}
}
消息接收端
在service-mq 中编写
package com.atguigu.gmall.mq.receiver;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-09-16 15:41
*/
@Slf4j
@Component
public class NormalMsgReceiver {
/**
* @param data
* @param channel 信道
* @param message 消息对象
* @RabbitListener:如果交换机跟队列已创建且完成绑定,只需要设置监听队列名称即可 queues = ""
* @RabbitListener:如果交换机跟队列已都没有,扫描到指定交换机,队列,自动创建且完成绑定 消费者:负责监听消息
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("exchange.confirm"),
value = @Queue("queue.confirm"),
key = "routing.confirm"
))
public void receiverMessage(String data, Channel channel, Message message) {
if (StringUtils.isNotBlank(data)) {
log.info("[消费者]监听到普通消息:{}", data);
System.out.println("模拟处理XXX业务");
}
//默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
//消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
//basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
测试:http://localhost:8282/api/mq/sendNormalMsg
实现思路:借助redis来实现重发机制
在rabbit-util
模块中添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
rabbit-util
模块中自定义一个实体类来接收消息
package com.atguigu.gmall.common.model;
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CorrelationData;
@Data
public class GmallCorrelationData extends CorrelationData {
// 默认有一个消息唯一表示的Id private volatile String id;
// 消息主体
private Object message;
// 交换机
private String exchange;
// 路由键
private String routingKey;
// 重试次数
private int retryCount = 0;
// 消息类型 是否是延迟消息
private boolean isDelay = false;
// 延迟时间
private int delayTime = 10;
}
修改rabbit-util
中RabbitService
中发送方法:sendMessage() 修改发送方法
package com.atguigu.gmall.rabbit.service;
import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.rabbit.model.GmallCorrelationData;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 所有业务模块需要发送消息工具类
*
* @author: atguigu
* @create: 2023-09-16 15:27
*/
@Component
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
/***
* 发送普通消息方法
* @param exchange
* @param routingKey
* @param msg
*/
public void sendMessage(String exchange, String routingKey, Object msg) {
//1.发送消息同时设置相关数据
GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
String id = UUID.randomUUID().toString().replaceAll("-", "");
gmallCorrelationData.setId(id);
gmallCorrelationData.setExchange(exchange);
gmallCorrelationData.setRoutingKey(routingKey);
gmallCorrelationData.setMessage(msg);
//2.将相关数据存入Redis,设置5分钟有效期
String key = "mq:" + id;
String correlationDataStr = JSON.toJSONString(gmallCorrelationData);
redisTemplate.opsForValue().set(key, correlationDataStr, 5, TimeUnit.MINUTES);
//3.执行发送消息-设置相关数据
rabbitTemplate.convertAndSend(exchange, routingKey, msg, gmallCorrelationData);
}
}
发送失败调用重发方法 MQProducerAckConfig 类中修改confirm方法
package com.atguigu.gmall.rabbit.config;
import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.rabbit.model.GmallCorrelationData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.awt.*;
import java.util.concurrent.TimeUnit;
/**
* MQProducerConfig类产生对象:既是ConfirmCallback接口类型 又是ReturnCallback接口类型
*
* @author: atguigu
* @create: 2023-09-18 09:15
*/
@Slf4j
@Component
public class MQProducerConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
/**
* 项目启动成功后(RabbitTemplate对象在IOC容器中)自动执行一次
*/
@PostConstruct
public void init() {
//设置交换机确认回调
rabbitTemplate.setConfirmCallback(this);
//设置队列确认异常回调
rabbitTemplate.setReturnCallback(this);
log.info("[RabbitMQ]生产者,项目启动后执行生产者确认设置");
}
/**
* 生产者确认回调-交换机回调(正常,异常)
*
* @param correlationData 相关数据,只有当调用模板对象发送业务消息同时设置相关数据
* @param ack true:正常, false:异常
* @param cause 异常原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("交换机异常回调,原因:{},进入到重试逻辑。", cause);
if (correlationData != null && StringUtils.isNotBlank(correlationData.getId())) {
this.retryMessage(correlationData.getId());
}
}
}
/**
* 生产者确认回调-队列确认异常。只要交换机路由消息到队列失败就会触发该方法回调
*
* @param message MQ消息对象.
* @param replyCode 异常码.
* @param replyText 异常消息.
* @param exchange 交换机名称.
* @param routingKey 路由键名称.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("[生产者-队列异常确认],消息路由queue失败,应答码={},原因={},交换机={},路由键={},MQ消息={},进入到重试逻辑.", replyCode, replyText, exchange, routingKey, message.toString());
//1.从MQ消息对象中属性信息中获取头信息 spring_returned_message_correlation -> 25fc60a2aa2d4981a6aaa140a0be5ec6
String id = message.getMessageProperties().getHeader("spring_returned_message_correlation");
if (StringUtils.isNotBlank(id)) {
//2.进行重试
this.retryMessage(id);
}
}
/**
* 当生产者发送消息异常,在人工介入前程序自动进行重试
*
* @param id 相关数据标识,根据该标识获取存在Redis中相关数据信息
*/
private void retryMessage(String id) {
//1.构建相关数据Key
String key = "mq:" + id;
//2.查询Redis中存放相关数据-注意该数据存采用Strin g
String correlationDataStr = (String) redisTemplate.opsForValue().get(key);
GmallCorrelationData gmallCorrelationData = JSON.parseObject(correlationDataStr, GmallCorrelationData.class);
//3.判断重试是否超限 阈值:3次
int retryCount = gmallCorrelationData.getRetryCount();
if (retryCount > 2) {
//4.如果重试超限,写入生产者消息异常表(将来由人工处理)
log.error("[RabbitMQ]消息重发超限:相关数据:{}", correlationDataStr);
return;
}
//5.如果重试未超限,再次发送业务消息+相关数据 更新重试次数
//5.1 更新重试次数
gmallCorrelationData.setRetryCount(retryCount + 1);
log.info("正在执行第:{}次重发消息。", retryCount + 1);
Long ttl = redisTemplate.getExpire(key);
redisTemplate.opsForValue().set(key, JSON.toJSONString(gmallCorrelationData), ttl, TimeUnit.SECONDS);
//5.2 将发送失败消息再次进行重发
rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), gmallCorrelationData);
}
}
测试: 修改路由键或交换机 -- 完美!
分别在商品微服务
跟搜索微服务
模块导入以下依赖
<!--rabbitmq消息队列-->
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
我在商品上架与商品添加时发送消息
商品上架业务实现类SkuManageServiceImpl
@Autowired
private RabbitService rabbitService;
/**
* 上架商品
*
* @param skuId
*/
@Override
public void onSale(Long skuId) {
//1.修改商品表中商品状态
SkuInfo skuInfo = new SkuInfo(skuId);
skuInfo.setIsSale(1);
skuInfoService.updateById(skuInfo);
//2.将上架商品skuId存入布隆过滤器
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
bloomFilter.add(skuId);
//2.基于MQ异步消息通知搜索服务新增索引库文档
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);
}
商品下架
/**
* 下架商品
*
* @param skuId
*/
@Override
public void cancelSale(Long skuId) {
//1.修改商品表中商品状态 update sku_info set is_sale=? where id = ?
LambdaUpdateWrapper<SkuInfo> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.set(SkuInfo::getIsSale, 0);
updateWrapper.eq(SkuInfo::getId, skuId);
skuInfoService.update(updateWrapper);
//2.基于MQ异步消息通知搜索服务删除索引库文档
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_LOWER, skuId);
}
package com.atguigu.gmall.list.receiver;
import com.atguigu.gmall.list.service.SearchService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import java.io.IOException;
/**
* 搜索服务监听器类
*
* @author: atguigu
* @create: 2023-09-18 11:36
*/
@Slf4j
@Component
public class ListReceiver {
@Autowired
private SearchService searchService;
/**
* 监听商品上架消息,构建商品索引库文档对象并写入索引库
* 每个消费者都需要考虑是否要做幂等性处理
* @param skuId
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, durable = "true"),
value = @Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"),
key = MqConst.ROUTING_GOODS_UPPER
))
public void processsGoodsUpper(Long skuId, Channel channel, Message message) {
//1.执行业务
if (skuId != null) {
log.info("[搜索服务]商品上架监听:{}", skuId);
searchService.upperGoods(skuId);
}
//2.执行手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 监听商品下架消息,删除索引库文档
*
* @param skuId
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, durable = "true"),
value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
key = MqConst.ROUTING_GOODS_LOWER
))
public void processsGoodsLower(Long skuId, Channel channel, Message message) {
//1.执行业务
if (skuId != null) {
log.info("[搜索服务]商品下架监听:{}", skuId);
searchService.lowerGoods(skuId);
}
//2.执行手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
启动后台管理页面
http://localhost:8888/#/product/sku/list
操作商品的上架,下架。动态更改es中的数据。
可以通过http://192.168.200.128:5601/app/kibana#/dev_tools/console?_g=() 观察功能是否实现!
延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件
使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列
package com.atguigu.gmall.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 基于死信实现延迟消息:注册交换机跟队列完成绑定
* 配置类不用记,直接拷贝
*
* @author: atguigu
* @create: 2023-09-18 14:27
*/
@Configuration
public class DeadLetterConfig {
public static final String exchange_dead = "exchange.dead";
public static final String routing_dead_1 = "routing.dead.1";
public static final String routing_dead_2 = "routing.dead.2";
public static final String queue_dead_1 = "queue.dead.1";
public static final String queue_dead_2 = "queue.dead.2";
/**
* 注册交换机对象
*
* @return
*/
@Bean
public DirectExchange exchange() {
//p1:交换机名称 p2:是否持久化 p3:自动删除(删除条件:当没有队列跟交换机绑定) p4:交换机额外参数
return new DirectExchange(exchange_dead, true, false);
}
/**
* 注册队列对象1:设置队列TTL;设置超时后发送消息指定交换机名称跟路由键名称
*/
@Bean
public Queue queue1() {
//1.只对首次声明它的连接(Connection)可见 2.会在其连接断开的时候自动删除
//p1:队列名称 p2:是否持久化 p3:排他性队列 p4:自动删除(删除条件:当没有交换机跟队列绑定) p5:额外参数
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 20000);
args.put("x-dead-letter-exchange", exchange_dead);
args.put("x-dead-letter-routing-key", routing_dead_2);
return new Queue(queue_dead_1, true, false, false, args);
}
/**
* 进行交换机跟队列1绑定
*/
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
}
//注册队列对象2
@Bean
public Queue queue2(){
//1.只对首次声明它的连接(Connection)可见 2.会在其连接断开的时候自动删除
//p1:队列名称 p2:是否持久化 p3:排他性队列 p4:自动删除(删除条件:当没有交换机跟队列绑定) p5:额外参数
return new Queue(queue_dead_2, true, false, false);
}
//进行交换机跟队列2绑定
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
}
}
service-mq
模块 ProducerController
/**
* 基于死信实现延迟消息
*
* @param data
* @return
*/
@GetMapping("/sendDeadLetterMsg")
public Result sendDeadLetterMsg(@RequestParam("data") String data) {
rabbitService.sendMessage(DeadLetterConfig.exchange_dead, DeadLetterConfig.routing_dead_1, data);
log.info("基于死信-发送延迟消息成功");
return Result.ok();
}
service-mq
模块
package com.atguigu.gmall.mq.receiver;
import com.atguigu.gmall.mq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-09-16 15:41
*/
@Slf4j
@Component
public class DeadLetterMsgReceiver {
/**
* @param data
* @param channel 信道
* @param message 消息对象
*/
@SneakyThrows
@RabbitListener(queues = DeadLetterConfig.queue_dead_2)
public void receiverMessage(String data, Channel channel, Message message) {
if (StringUtils.isNotBlank(data)) {
log.info("[消费者]监听到基于死信实现的延迟消息:{}", data);
}
//默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
//消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
//basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
Rabbitmq实现了一个插件x-delay-message来实现延时队列
首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
切换到插件所在目录,执行命令,将刚插件拷贝到容器内plugins目录下
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez gmalldocker_rabbitmq_1:/plugins
执行 docker exec -it gmalldocker_rabbitmq_1 /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
执行 ls -l|grep delay 命令查看插件是否copy成功
在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
exit命令退出RabbitMQ容器内部,然后执行 docker restart gmalldocker_rabbitmq_1 命令重启RabbitMQ容器
在service-mq
中添加类,配置队列
package com.atguigu.gmall.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedMqConfig {
public static final String exchange_delay = "exchange.delay";
public static final String routing_delay = "routing.delay";
public static final String queue_delay_1 = "queue.delay.1";
@Bean
public Queue delayQeue1() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(queue_delay_1, true);
}
/**
* 声明延迟类型交换机
* 前提:必须要保证MQ配置开启延迟插件
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
}
@Bean
public Binding delayBbinding1() {
return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
}
}
ProducerController:发送消息
/**
* 基于延迟插件实现延迟消息
*
* @param data
* @return
*/
@GetMapping("/sendDelayMsg")
public Result sendDelayMsg(@RequestParam("data") String data, @RequestParam("delayTime") int delayTime) {
rabbitService.sendDelayMessage(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, data, delayTime);
log.info("基于延迟插件-发送延迟消息成功");
return Result.ok();
}
rabbit-util
中RabbitService
中封装发送延迟消息方法,队列确认方法中增加延迟队列判断
/***
* 发送延迟消息方法
* @param exchange
* @param routingKey
* @param msg
* @param dealyTime 延迟时间,单位设置为:秒
*/
public void sendDelayMessage(String exchange, String routingKey, Object msg, Integer dealyTime) {
//1.发送消息同时设置相关数据
GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
String id = UUID.randomUUID().toString().replaceAll("-", "");
gmallCorrelationData.setId(id);
gmallCorrelationData.setExchange(exchange);
gmallCorrelationData.setRoutingKey(routingKey);
gmallCorrelationData.setMessage(msg);
gmallCorrelationData.setDelay(true);
gmallCorrelationData.setDelayTime(dealyTime);
//2.将相关数据存入Redis,设置5分钟有效期
String key = "mq:" + id;
String correlationDataStr = JSON.toJSONString(gmallCorrelationData);
redisTemplate.opsForValue().set(key, correlationDataStr, 5, TimeUnit.MINUTES);
//3.执行发送消息-设置相关数据
rabbitTemplate.convertAndSend(exchange, routingKey, msg, (message -> {
//设置消息过期时间
message.getMessageProperties().setDelay(dealyTime * 1000);
return message;
}), gmallCorrelationData);
}
MQProducerAckConfig
队列确认增加延迟消息判断
/**
* 当生产者发送消息异常,在人工介入前程序自动进行重试
*
* @param id 相关数据标识,根据该标识获取存在Redis中相关数据信息
*/
private void retryMessage(String id) {
//1.构建相关数据Key
String key = "mq:" + id;
//2.查询Redis中存放相关数据-注意该数据存采用Strin g
String correlationDataStr = (String) redisTemplate.opsForValue().get(key);
GmallCorrelationData gmallCorrelationData = JSON.parseObject(correlationDataStr, GmallCorrelationData.class);
//重发对于延迟消息处理方式一:忽略延迟消息
if (gmallCorrelationData.isDelay()) {
return;
}
//3.判断重试是否超限 阈值:3次
int retryCount = gmallCorrelationData.getRetryCount();
if (retryCount > 2) {
//4.如果重试超限,写入生产者消息异常表(将来由人工处理)
log.error("[RabbitMQ]消息重发超限:相关数据:{}", correlationDataStr);
return;
}
//5.如果重试未超限,再次发送业务消息+相关数据 更新重试次数
//5.1 更新重试次数
gmallCorrelationData.setRetryCount(retryCount + 1);
log.info("正在执行第:{}次重发消息。", retryCount + 1);
Long ttl = redisTemplate.getExpire(key);
redisTemplate.opsForValue().set(key, JSON.toJSONString(gmallCorrelationData), ttl, TimeUnit.SECONDS);
//5.2 将发送失败消息再次进行重发
if (gmallCorrelationData.isDelay()) {
//延迟消息重发
rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), message -> {
message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime() * 1000);
return message;
}, gmallCorrelationData);
} else {
//普通消息重发
rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), gmallCorrelationData);
}
}
接收消息,消费者端判断是否需要做幂等性处理
package com.atguigu.gmall.mq.receiver;
import com.atguigu.gmall.mq.config.DeadLetterConfig;
import com.atguigu.gmall.mq.config.DelayedMqConfig;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author: atguigu
* @create: 2023-09-16 15:41
*/
@Slf4j
@Component
public class DelayMsgReceiver {
@Autowired
private RedisTemplate redisTemplate;
/**
* @param data
* @param channel 信道
* @param message 消息对象
*/
@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void receiverMessage(String data, Channel channel, Message message) {
if (StringUtils.isNotBlank(data)) {
log.info("[消费者]监听到基于延迟插件实现的延迟消息:{}", data);
//判断当前消费者业务端是否需要满足幂等性 ,只处理第一次消息 1.利用MySQL唯一约束 2.利用Redis set nx
//1.获取消息中唯一业务标识,例如订单id,编号。保证消息幂等性
String key = "mq:" + data;
//2.将业务标识存入Redis 设置过期时间
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, data, 5, TimeUnit.MINUTES);
if (!flag) {
//该项业务数据已被执行
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
log.info("业务执行...................");
}
//默认消费者:Broker服务器只负责将消息投递,无论消费者业务成功或者失败,Broker自动将消息删除
//消费者端进行手动应答:Broker服务器投递消息后,一致等待消费者进行应答,Broker收到应答才会将MQ队列中消息删除
//basicAck业务正常执行完毕 p1:消息唯一标识从1开始递增 p2:是否批量确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//basicNack业务非正常执行-进行重试(设置上限) p1:消息标识 p2:是否批量确认消息 p3:是否重新入队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//basicReject消费者明确拒绝接受消息,被拒绝消息会进入死信队列 p1:消息标识 p3:是否重新入队
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
消费结果会发送三次,也被消费三次!
如何保证消息幂等性?
使用数据库方式
使用redis setnx 命令解决(推荐)
service-order模块
rabbit-util模块已配置常量MqConst
/**
* 取消订单,发送延迟队列
*/
public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
public static final String ROUTING_ORDER_CANCEL = "order.create";
//延迟取消订单队列
public static final String QUEUE_ORDER_CANCEL = "queue.order.cancel";
//取消订单 延迟时间 单位:秒
public static final int DELAY_TIME = 10;
service-order
模块添加依赖
<!--rabbitmq工具模块-->
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
配置队列
package com.atguigu.gmall.order.config;
import com.atguigu.gmall.rabbit.config.MqConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 创建取消订单延迟关闭订单需要交换机跟队列完成绑定
*/
@Configuration
public class CancelOrderBindingConfig {
@Bean
public Queue cancelOrderQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(MqConst.QUEUE_ORDER_CANCEL, true);
}
/**
* 声明延迟类型交换机
* 前提:必须要保证MQ配置开启延迟插件
* @return
*/
@Bean
public CustomExchange cancelOrderExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args);
}
@Bean
public Binding delayBbinding1() {
return BindingBuilder.bind(cancelOrderQueue()).to(cancelOrderExchange()).with(MqConst.ROUTING_ORDER_CANCEL).noargs();
}
}
创建订单时,发送延迟消息
修改保存订单方法submitOrder
@Autowired
private RabbitService rabbitService;
/**
* 提交保存订单
*
* @param orderInfo
* @param tradeNo
* @return
*/
@Override
public Long submitOrder(OrderInfo orderInfo, String tradeNo) {
//6.TODO 商城订单采用延迟消息方式-检查订单&关闭订单
//TODO 测试设置为20秒延迟 真实延迟时间跟订单设置过期时间 动态计算出来
Date expireTime = orderInfo.getExpireTime();
Date now = new Date();
Long ttl = (expireTime.getTime() - now.getTime())/1000;
rabbitService.sendDelayMessage(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, MqConst.ROUTING_ORDER_CANCEL, orderId, ttl.intValue());
//TODO 订单提交后购物车数据清理,开发阶段不清理
return orderId;
}
package com.atguigu.gmall.order.reciever;
import com.atguigu.gmall.order.service.OrderInfoService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-09-18 16:00
*/
@Slf4j
@Component
public class OrderReceiver {
@Autowired
private OrderInfoService orderInfoService;
/**
* 延迟关闭订单消费者
*
* @param orderId 订单ID
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(queues = MqConst.QUEUE_ORDER_CANCEL)
public void processCloseOrder(Long orderId, Channel channel, Message message) {
//1.业务处理 可以不处理幂等
if (orderId != null) {
log.info("[订单服务]监听到延迟关单消息:{}", orderId);
orderInfoService.execCloseOrder(orderId);
}
//2.手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
OrderInfoService
/**
* 验证订单支付状态且关闭订单
*
* @param orderId
*/
void execCloseOrder(Long orderId);
/**
* 将指定订单改为指定状态
*
* @param orderId
*/
void updateOrderStatus(Long orderId, ProcessStatus processStatus);
OrderInfoServiceImpl
/**
* 根据订单ID查询订单支付状态,如果订单状态为:未支付,将其改为关闭
*
* @param orderId
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void execCloseOrder(Long orderId) {
//1.根据订单ID查询订单信息
OrderInfo orderInfo = this.getById(orderId);
//2.判断订单状态
if (orderId != null && PaymentStatus.UNPAID.name().equals(orderInfo.getOrderStatus())) {
this.updateOrderStatus(orderId, ProcessStatus.CLOSED);
}
}
/**
* 根据订单ID将订单改为指定状态
*
* @param orderId
* @param processStatus
*/
@Override
public void updateOrderStatus(Long orderId, ProcessStatus processStatus) {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setId(orderId);
orderInfo.setOrderStatus(processStatus.getOrderStatus().name());
orderInfo.setProcessStatus(processStatus.name());
this.updateById(orderInfo);
}