[TOC]
# 库存
## 1 RabbitMQ
### 1.1 搭建spzx-common-rabbit模块
由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
#### 1.1.1 新建模块
在`spzx-common`模块下新建`spzx-common-rabbit`模块
#### 1.1.2 pom.xml
```xml
com.spzx
spzx-common
3.6.3
4.0.0
spzx-common-rabbit
spzx-common-rabbit服务
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-bus-amqp
com.alibaba.fastjson2
fastjson2
org.projectlombok
lombok
com.spzx
spzx-common-redis
```
#### 1.1.3 RabbitService
```java
package com.spzx.common.rabbit.service;
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param message 消息
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
```
#### 1.1.4 加载配置类
在resources目录中创建
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
```java
com.spzx.common.rabbit.service.RabbitService
```
#### 1.1.5 MqConst
提供常量类 MqConst
```java
package com.spzx.common.rabbit.constant;
public class MqConst {
/**
* 测试
*/
public static final String EXCHANGE_TEST = "spzx.exchange.test";
public static final String ROUTING_TEST = "spzx.routing.test";
public static final String ROUTING_CONFIRM = "spzx.routing.confirm";
//队列
public static final String QUEUE_TEST = "spzx.queue.test";
public static final String QUEUE_CONFIRM = "spzx.queue.confirm";
/**
* 库存
*/
public static final String EXCHANGE_PRODUCT = "spzx.exchange.product";
public static final String ROUTING_UNLOCK = "spzx.routing.unlock";
public static final String ROUTING_MINUS = "spzx.routing.minus";
//队列
public static final String QUEUE_UNLOCK = "spzx.queue.unlock";
public static final String QUEUE_MINUS = "spzx.queue.minus";
/**
* 支付
*/
public static final String EXCHANGE_PAYMENT_PAY = "spzx.exchange.payment";
public static final String ROUTING_PAYMENT_PAY = "spzx.routing.payment.pay";
public static final String ROUTING_PAYMENT_CLOSE = "spzx.routing.payment.close";;
public static final String QUEUE_PAYMENT_PAY = "spzx.queue.payment.pay";
public static final String QUEUE_PAYMENT_CLOSE = "spzx.queue.payment.close";
/**
* 取消订单延迟消息
*/
public static final String EXCHANGE_CANCEL_ORDER = "spzx.exchange.cancel.order";
public static final String ROUTING_CANCEL_ORDER = "spzx.routing.cancel.order";
public static final String QUEUE_CANCEL_ORDER = "spzx.queue.cancel.order";
public static final Integer CANCEL_ORDER_DELAY_TIME = 15 * 60;
}
```
### 1.2 RabbitMQ测试
我们在`spzx-order`模块测试mq消息
#### 1.2.1 配置RabbitMQ
在nacos配置中心,spzx-order-dev.yml文件添加配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
```
说明:host改为实际的IP
#### 1.2.2 引入spzx-common-rabbit模块
在`spzx-order`模块pom.xml文件添加依赖
```xml
com.spzx
spzx-common-rabbit
3.6.3
```
#### 1.2.3 MqController
发送消息
```java
package com.spzx.order.controller;
@Tag(name = "Mq接口管理")
@RestController
@RequestMapping("/mq")
public class MqController extends BaseController
{
@Autowired
private RabbitService rabbitService;
@Operation(summary = "发送消息")
@GetMapping("/sendMessage")
public AjaxResult sendMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_TEST, "hello");
return success();
}
}
```
#### 1.2.4 TestReceiver
监听消息:启动spzx-order服务,即可看到exchange、queue以及他们之间的路由绑定关系已经创建出来了
```java
package com.spzx.order.receiver;
@Slf4j
@Component
public class TestReceiver {
/**
* 监听消息
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_TEST, durable = "true"),
key = MqConst.ROUTING_TEST
))
public void test(String content, Message message) {
//都可以
log.info("接收消息:{}", content);
log.info("接收消息:{}", new String(message.getBody()));
}
}
```
#### 1.2.5 knife4j测试
发送消息

监听消息:查看idea打印结果
### 1.3 消息可靠性配置
#### 1.3.1 介绍
MQ消息的可靠性,一般需要三个方面一起保证:
1. 生产者不丢数据(可靠性投递)
2. MQ服务器不丢数据(可靠性存储)
3. 消费者不丢数据(可靠性消费)
#### 1.3.2 消息发送确认配置
消息发送确认可以保证生产者不丢数据
##### 1 封装发送端消息配置类
生产者不丢数据从两个方面保证:**生产者确认机制、生产者退回机制**
操作模块:`spzx-common-rabbit`
```java
package com.spzx.common.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitInitConfigApplicationListener implements ApplicationListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.setupCallbacks();
}
private void setupCallbacks() {
/**
* 只确认消息是否正确到达 Exchange 中,成功与否都会回调
*
* @param correlation 相关数据 非消息本身业务数据
* @param ack 应答结果
* @param reason 如果发送消息到交换器失败,错误原因
*/
this.rabbitTemplate.setConfirmCallback((correlationData, ack, reason) -> {
if (ack) {
//消息到交换器成功
log.info("消息发送到Exchange成功:{}", correlationData);
} else {
//消息到交换器失败
log.error("消息发送到Exchange失败:{}", reason);
}
});
/**
* 消息没有正确到达队列时触发回调,如果正确到达队列不执行
*/
//默认情况下mandatory的值是false
//但是只要配置开启了publisher-returns则mandatory的值就是true
//this.rabbitTemplate.setMandatory(true);//是否让rabbitmq将失败的消息的信息再次返回给生产者
this.rabbitTemplate.setReturnsCallback(returned -> {
log.error("返回: " + returned.getMessage()
+ "\n 响应码: " + returned.getReplyCode()
+ "\n 响应消息: " + returned.getReplyText()
+ "\n 交换机: " + returned.getExchange()
+ "\n 路由: " + returned.getRoutingKey());
});
}
}
```
##### 2 加载配置类
resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
```java
com.spzx.common.rabbit.config.RabbitInitConfigApplicationListener
```
##### 3 修改配置
在nacos配置中心,修改spzx-order-dev.yml配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #开启生产者确认机制
publisher-returns: true #开启生产者退回机制
listener:
simple:
acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
```
##### 4 MqController
发送确认消息
```java
@Operation(summary = "发送确认消息")
@GetMapping("/sendConfirmMessage")
public AjaxResult sendConfirmMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_CONFIRM, "hello, confirm");
return success();
}
```
##### 5 TestReceiver
监听确认消息
```java
/**
* 监听确认消息
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_CONFIRM, durable = "true"),
key = MqConst.ROUTING_CONFIRM
))
public void confirm(String content, Message message, Channel channel) {
log.info("接收确认消息:{}", content);
// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
## 2 库存接口
我们的商品不允许超卖,为了防止超卖,我们下单必须检查与锁定库存,下单失败或取消订单要解锁库存,支付成功扣减库存
### 2.1 下单检查与锁定库存
#### 2.1.1 远程调用接口
##### SkuLockVo
操作模块:spzx-api-product
```java
package com.spzx.product.api.domain;
@Data
public class SkuLockVo
{
private Long skuId;
private Integer skuNum;
private String skuName;
/** 是否有库存 **/
private boolean isHaveStock = false;
//private Boolean isHaveStock = false;//Boolean和boolean生成的getter、setter不一样
}
```
操作模块:spzx-product
##### SkuStockMapper
```java
//校验sku库存
SkuStock check(@Param("skuId") Long skuId, @Param("num")Integer num);
//锁定sku库存
Integer lock(@Param("skuId") Long skuId, @Param("num")Integer num);
```
##### SkuStockMapper.xml
```xml
UPDATE sku_stock set lock_num = lock_num + #{skuNum} , available_num = available_num - #{skuNum} where sku_id = #{skuId} and del_flag = 0;
```
##### SkuStockController
```java
package com.spzx.product.controller;
import com.spzx.common.core.domain.R;
import com.spzx.common.core.web.controller.BaseController;
import com.spzx.common.security.annotation.InnerAuth;
import com.spzx.product.service.ISkuStockService;
import com.spzx.product.vo.SkuLockVo;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
*
* 商品sku库存表 前端控制器
*
*
* @author atguigu
* @since 2025-04-28
*/
@RestController
@RequestMapping("/skuStock")
public class SkuStockController extends BaseController {
@Autowired
private ISkuStockService skuStockService;
/**
* 检查与锁定库存,要求不能出现库存超卖
* @param orderNo
* @param skuLockVoList
* @return
*/
@InnerAuth
@Operation(summary = "检查与锁定库存")
@PostMapping("/checkAndLock/{orderNo}")
public R checkAndLock(@PathVariable String orderNo, @RequestBody List skuLockVoList) {
String stockErrorMsg = skuStockService.checkAndLock(orderNo, skuLockVoList);
return R.ok(stockErrorMsg);
}
}
```
##### ISkuStockService
```java
/**
* 检查并锁定库存,避免库存“超卖”
* @param orderNo
* @param skuLockVoList
* @return
*/
String checkAndLock(String orderNo, List skuLockVoList);
```
##### SkuStockServiceImpl
```java
@Autowired
private RedisTemplate redisTemplate;
/**
* 检查并锁定库存,避免库存“超卖”
*
* @param orderNo
* @param skuLockVoList
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public String checkAndLock(String orderNo, List skuLockVoList) {
//1.遍历商品扣减VO列表,检查是否满足锁定库存条件,为VO中属性是否有库存赋值
for (SkuLockVo skuLockVo : skuLockVoList) {
//1.1 判断是否满足锁定库存,采用MySQL的悲观锁来进行锁定:select语句where条件+for update
SkuStock skuStock = baseMapper.checkStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
//1.2 根据select悲观锁结果来判断是否有库存
if (skuStock != null) {
//有库存就将VO的属性设置为true
skuLockVo.setHaveStock(true);
} else {
//没有库存,将VO的属性设置为false
skuLockVo.setHaveStock(false);
}
}
//2.如果有任意一件商品无法锁定,则锁定失败,封装锁定库存商品提示信息
StringBuilder stockErrorMsg = new StringBuilder("");
boolean flag = skuLockVoList.stream().anyMatch(skuLockVo -> !skuLockVo.isHaveStock());
if (flag) {
//2.1遍历商品库存锁定VO列表 找出所有没有锁定成功的商品
for (SkuLockVo skuLockVo : skuLockVoList) {
if (!skuLockVo.isHaveStock()) {
//2.2 查询当前库存不足商品实时可用库存数量
SkuStock skuStock = baseMapper.selectOne(
new LambdaQueryWrapper().eq(SkuStock::getSkuId, skuLockVo.getSkuId())
);
//2.3 拼接错误提示信息
stockErrorMsg.append(skuLockVo.getSkuName())
.append("库存不足,当前剩余:")
.append(skuStock.getAvailableNum())
.append(";");
}
}
}
if (StringUtils.isNotBlank(stockErrorMsg.toString())) {
return stockErrorMsg.toString();
}
//3.如果全部商品可以锁定成功,则进行商品库存锁定
for (SkuLockVo skuLockVo : skuLockVoList) {
baseMapper.lockStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
}
//4.将商品锁定库存信息存在Redis-用于取消订单解锁库存/订单支付成功最终库存扣减
String dataKey = "sku:lock:data:" + orderNo;
redisTemplate.opsForValue().set(dataKey, skuLockVoList);
return null;
}
```
#### 1.1.2、openFeign接口定义
操作模块:spzx-api-product
##### RemoteSkuStockService
```java
@FeignClient(
contextId = "remoteSkuStockService",
value = ServiceNameConstants.PRODUCT_SERVICE,
fallbackFactory = RemoteSkuStockFallbackFactory.class
)
public interface RemoteSkuStockService {
@PostMapping("/skuStock/checkAndLock/{orderNo}")
R checkAndLock(
@PathVariable("orderNo") String orderNo,
@RequestBody List skuLockVoList,
@RequestHeader(SecurityConstants.FROM_SOURCE) String source
);
}
```
##### RemoteSkuStockFallbackFactory
```java
package com.spzx.product.api.factory;
public class RemoteSkuStockFallbackFactory implements FallbackFactory {
@Override
public RemoteSkuStockService create(Throwable cause) {
return new RemoteSkuStockService() {
@Override
public R checkAndLock(String orderNo, List skuLockVoList, String source) {
return R.fail("检查与锁定商品失败:" + cause.getMessage());
}
};
}
}
```
##### 加载配置类
org.springframework.boot.autoconfigure.AutoConfiguration.imports
```
com.spzx.product.api.fallback.RemoteSkuStockFallbackFactory
```
#### 1.1.3、下单接口改造
操作模块:`spzx-order`
##### OrderInfoServiceImpl
```java
@Autowired
private RemoteSkuStockService remoteSkuStockService;
@Transactional(rollbackFor = Exception.class)
@Override
public Long submitOrder(OrderForm orderForm) {
...
//3 校验库存并锁定库存
List skuLockVoList = orderItemList.stream().map(item -> {
SkuLockVo skuLockVo = new SkuLockVo();
skuLockVo.setSkuId(item.getSkuId());
skuLockVo.setSkuNum(item.getSkuNum());
return skuLockVo;
}).collect(Collectors.toList());
String checkAndLockResult = remoteSkuStockService.checkAndLock(
orderForm.getTradeNo(),
skuLockVoList,
SecurityConstants.INNER).getData();
if(StringUtils.isNotEmpty(checkAndLockResult)) {
throw new ServiceException(checkAndLockResult);
}
...
return orderId;
}
```
### 2.2 解锁库存
如果下单失败,则接口会抛出异常,那么我们需要将已锁定的库存进行解锁。
#### 2.2.1 下单接口改造
操作模块:`spzx-order`
##### OrderInfoServiceImpl
下单异常时需要解锁库存:
```java
Long orderId = null;
try {
//下单
orderId = this.saveOrder(orderForm);
} catch (Exception e) {
e.printStackTrace();
//下单失败,解锁库存
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderForm.getTradeNo());
//抛出异常
throw new ServiceException("下单失败");
}
```
取消订单时需要解锁库存:
```java
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId) {
OrderInfo orderInfo = baseMapper.selectById(orderId);
if(null != orderInfo && orderInfo.getOrderStatus().intValue() == 0) {
......
//发送MQ消息通知商品系统解锁库存:TODO
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderInfo.getOrderNo());
}
}
```
#### 2.2.2 远程调用接口
操作模块:`spzx-product`
##### 1 pom.xml
添加依赖
```xml
com.spzx
spzx-common-rabbit
3.6.3
```
##### 2 spzx-product-dev.yml
添加配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED
publisher-returns: true
listener:
simple:
cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
```
##### 3 ProductReceiver
```java
package com.spzx.product.receiver;
@Slf4j
@Component
public class ProductReceiver {
@Autowired
private ISkuStockService skuStockService;
/**
* 解锁库存
* @param orderNo 订单号
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_PRODUCT, durable = "true"),
value = @Queue(value = MqConst.QUEUE_UNLOCK, durable = "true"),
key = {MqConst.ROUTING_UNLOCK}
))
public void unlock(String orderNo, Message message, Channel channel) {
//业务处理
log.info("[商品服务]监听解锁库存消息:{}", orderNo);
//解锁库存
skuStockService.unlock(orderNo);
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
```
##### 4 ISkuStockService
```java
void unlock(String orderNo);
```
##### 5 skuStockServiceImpl
```java
@Transactional(rollbackFor = {Exception.class})
@Override
public void unlock(String orderNo) {
//幂等性处理
String key = "sku:unlock:" + orderNo;
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, orderNo, 1, TimeUnit.HOURS);
if(!isFirst) return;
// 获取锁定库存的缓存信息
String dataKey = "sku:lock:data:" + orderNo;
List skuLockVoList = (List)this.redisTemplate.opsForValue().get(dataKey);
if (CollectionUtils.isEmpty(skuLockVoList)){
return;
}
// 解锁库存
skuLockVoList.forEach(skuLockVo -> {
int row = baseMapper.unlock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
});
// 解锁库存之后,删除锁定库存的缓存。
this.redisTemplate.delete(dataKey);
}
```
##### 6 SkuStockMapper
```java
Integer unlock(@Param("skuId") Long skuId, @Param("num")Integer num);
```
##### 7 SkuStockMapper.xml
```xml
update sku_stock
set lock_num = lock_num - #{num}, available_num = available_num + #{num}
where sku_id = #{skuId}
```