[TOC]
# 库存
## 1 RabbitMQ
### 1.1 搭建spzx-common-rabbit模块
由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
#### 1.1.1 新建模块
在`spzx-common`模块下新建`spzx-common-rabbit`模块
#### 1.1.2 pom.xml
```xml
com.spzx
spzx-common
3.6.3
4.0.0
spzx-common-rabbit
spzx-common-rabbit服务
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-bus-amqp
com.alibaba.fastjson2
fastjson2
org.projectlombok
lombok
com.spzx
spzx-common-redis
```
#### 1.1.3 RabbitService
```java
package com.spzx.common.rabbit.service;
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param message 消息
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
```
#### 1.1.4 加载配置类
在resources目录中创建
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
```java
com.spzx.common.rabbit.service.RabbitService
```
#### 1.1.5 MqConst
提供常量类 MqConst
```java
package com.spzx.common.rabbit.constant;
public class MqConst {
/**
* 测试
*/
public static final String EXCHANGE_TEST = "spzx.exchange.test";
public static final String ROUTING_TEST = "spzx.routing.test";
public static final String ROUTING_CONFIRM = "spzx.routing.confirm";
//队列
public static final String QUEUE_TEST = "spzx.queue.test";
public static final String QUEUE_CONFIRM = "spzx.queue.confirm";
/**
* 库存
*/
public static final String EXCHANGE_PRODUCT = "spzx.exchange.product";
public static final String ROUTING_UNLOCK = "spzx.routing.unlock";
public static final String ROUTING_MINUS = "spzx.routing.minus";
//队列
public static final String QUEUE_UNLOCK = "spzx.queue.unlock";
public static final String QUEUE_MINUS = "spzx.queue.minus";
/**
* 支付
*/
public static final String EXCHANGE_PAYMENT_PAY = "spzx.exchange.payment";
public static final String ROUTING_PAYMENT_PAY = "spzx.routing.payment.pay";
public static final String ROUTING_PAYMENT_CLOSE = "spzx.routing.payment.close";;
public static final String QUEUE_PAYMENT_PAY = "spzx.queue.payment.pay";
public static final String QUEUE_PAYMENT_CLOSE = "spzx.queue.payment.close";
/**
* 取消订单延迟消息
*/
public static final String EXCHANGE_CANCEL_ORDER = "spzx.exchange.cancel.order";
public static final String ROUTING_CANCEL_ORDER = "spzx.routing.cancel.order";
public static final String QUEUE_CANCEL_ORDER = "spzx.queue.cancel.order";
public static final Integer CANCEL_ORDER_DELAY_TIME = 15 * 60;
}
```
### 1.2 RabbitMQ测试
我们在`spzx-order`模块测试mq消息
#### 1.2.1 配置RabbitMQ
在nacos配置中心,spzx-order-dev.yml文件添加配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
```
说明:host改为实际的IP
#### 1.2.2 引入spzx-common-rabbit模块
在`spzx-order`模块pom.xml文件添加依赖
```xml
com.spzx
spzx-common-rabbit
3.6.3
```
#### 1.2.3 MqController
发送消息
```java
package com.spzx.order.controller;
@Tag(name = "Mq接口管理")
@RestController
@RequestMapping("/mq")
public class MqController extends BaseController
{
@Autowired
private RabbitService rabbitService;
@Operation(summary = "发送消息")
@GetMapping("/sendMessage")
public AjaxResult sendMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_TEST, "hello");
return success();
}
}
```
#### 1.2.4 TestReceiver
监听消息:启动spzx-order服务,即可看到exchange、queue以及他们之间的路由绑定关系已经创建出来了
```java
package com.spzx.order.receiver;
@Slf4j
@Component
public class TestReceiver {
/**
* 监听消息
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_TEST, durable = "true"),
key = MqConst.ROUTING_TEST
))
public void test(String content, Message message) {
//都可以
log.info("接收消息:{}", content);
log.info("接收消息:{}", new String(message.getBody()));
}
}
```
#### 1.2.5 knife4j测试
发送消息

监听消息:查看idea打印结果
### 1.3 消息可靠性配置
#### 1.3.1 介绍
MQ消息的可靠性,一般需要三个方面一起保证:
1. 生产者不丢数据(可靠性投递)
2. MQ服务器不丢数据(可靠性存储)
3. 消费者不丢数据(可靠性消费)
#### 1.3.2 消息发送确认配置
消息发送确认可以保证生产者不丢数据
##### 1 封装发送端消息配置类
生产者不丢数据从两个方面保证:**生产者确认机制、生产者退回机制**
操作模块:`spzx-common-rabbit`
```java
package com.spzx.common.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitInitConfigApplicationListener implements ApplicationListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.setupCallbacks();
}
private void setupCallbacks() {
/**
* 只确认消息是否正确到达 Exchange 中,成功与否都会回调
*
* @param correlation 相关数据 非消息本身业务数据
* @param ack 应答结果
* @param reason 如果发送消息到交换器失败,错误原因
*/
this.rabbitTemplate.setConfirmCallback((correlationData, ack, reason) -> {
if (ack) {
//消息到交换器成功
log.info("消息发送到Exchange成功:{}", correlationData);
} else {
//消息到交换器失败
log.error("消息发送到Exchange失败:{}", reason);
}
});
/**
* 消息没有正确到达队列时触发回调,如果正确到达队列不执行
*/
//默认情况下mandatory的值是false
//但是只要配置开启了publisher-returns则mandatory的值就是true
//this.rabbitTemplate.setMandatory(true);//是否让rabbitmq将失败的消息的信息再次返回给生产者
this.rabbitTemplate.setReturnsCallback(returned -> {
log.error("返回: " + returned.getMessage()
+ "\n 响应码: " + returned.getReplyCode()
+ "\n 响应消息: " + returned.getReplyText()
+ "\n 交换机: " + returned.getExchange()
+ "\n 路由: " + returned.getRoutingKey());
});
}
}
```
##### 2 加载配置类
resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
```java
com.spzx.common.rabbit.config.RabbitInitConfigApplicationListener
```
##### 3 修改配置
在nacos配置中心,修改spzx-order-dev.yml配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #开启生产者确认机制
publisher-returns: true #开启生产者退回机制
listener:
simple:
acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
```
##### 4 MqController
发送确认消息
```java
@Operation(summary = "发送确认消息")
@GetMapping("/sendConfirmMessage")
public AjaxResult sendConfirmMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_CONFIRM, "hello, confirm");
return success();
}
```
##### 5 TestReceiver
监听确认消息
```java
/**
* 监听确认消息
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_CONFIRM, durable = "true"),
key = MqConst.ROUTING_CONFIRM
))
public void confirm(String content, Message message, Channel channel) {
log.info("接收确认消息:{}", content);
// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
## 2 库存接口
我们的商品不允许超卖,为了防止超卖,我们下单必须检查与锁定库存,下单失败或取消订单要解锁库存,支付成功扣减库存
### 2.1 下单检查与锁定库存
#### 2.1.1 远程调用接口
##### SkuLockVo
操作模块:spzx-api-product
```java
package com.spzx.product.api.domain;
@Data
public class SkuLockVo
{
private Long skuId;
private Integer skuNum;
private String skuName;
/** 是否有库存 **/
private boolean isHaveStock = false;
//private Boolean isHaveStock = false;//Boolean和boolean生成的getter、setter不一样
}
```
操作模块:spzx-product
##### SkuStockMapper
```java
//校验sku库存
SkuStock check(@Param("skuId") Long skuId, @Param("num")Integer num);
//锁定sku库存
Integer lock(@Param("skuId") Long skuId, @Param("num")Integer num);
```
##### SkuStockMapper.xml
```xml
update sku_stock
set lock_num = lock_num + #{num}, available_num = available_num - #{num}
where sku_id = #{skuId}
```
##### SkuStockController
```java
@RestController
@RequestMapping("/skuStock")
public class SkuStockController extends BaseController {
@Autowired
private ISkuStockService skuStockService;
@InnerAuth
@Operation(summary = "检查与锁定库存")
@PostMapping("/checkAndLock/{orderNo}")
public R checkAndLock(@PathVariable String orderNo, @RequestBody List skuLockVoList) {
return R.ok(skuStockService.checkAndLock(orderNo, skuLockVoList));
}
}
```
##### ISkuStockService
```java
String checkAndLock(String orderNo, List skuLockVoList);
```
##### SkuStockServiceImpl
```java
@Autowired
private RedisTemplate redisTemplate;
@Transactional(rollbackFor = {Exception.class})
@Override
public String checkAndLock(String orderNo, List skuLockVoList) {
// 遍历所有商品,验库存并锁库存
skuLockVoList.forEach(skuLockVo -> {
// 验库存:查询,返回是当前sku的库存信息
SkuStock skuStock = baseMapper.check(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
// 如果是null,这里就验库存失败
if (null == skuStock) {
skuLockVo.setHaveStock(false);//没有足够库存
} else {
skuLockVo.setHaveStock(true);//有足够库存
}
});
// 只要有一个商品库存不足
if (skuLockVoList.stream().anyMatch(skuLockVo -> !skuLockVo.isHaveStock())) {
// 过滤出所有库存不足的商品
List noStockList = skuLockVoList
.stream()
.filter(item -> !item.isHaveStock())
.collect(Collectors.toList());
//组装错误提示
StringBuffer errorMsg = new StringBuffer();
for (SkuLockVo skuLockVo : noStockList) {
//查询现有库存
Integer num = baseMapper.selectById(skuLockVo.getSkuId()).getAvailableNum();
errorMsg
.append(skuLockVo.getSkuName())
.append("库存不足,剩余库存:")
.append(num)
.append(";");
}
//返回错误信息
return errorMsg.toString();
} else {//所有商品库存充足
for (SkuLockVo skuLockVo : skuLockVoList) {
//锁定库存
baseMapper.lock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
}
}
// 如果所有商品都锁定成功的情况下,需要缓存锁定信息到redis。以方便将来解锁库存 或者 减库存
String dataKey = "sku:lock:data:" + orderNo;
this.redisTemplate.opsForValue().set(dataKey, skuLockVoList);
return "";
}
```
#### 1.1.2、openFeign接口定义
操作模块:spzx-api-product
##### RemoteSkuStockService
```java
@FeignClient(
contextId = "remoteSkuStockService",
value = ServiceNameConstants.PRODUCT_SERVICE,
fallbackFactory = RemoteSkuStockFallbackFactory.class
)
public interface RemoteSkuStockService {
@PostMapping("/skuStock/checkAndLock/{orderNo}")
R checkAndLock(
@PathVariable("orderNo") String orderNo,
@RequestBody List skuLockVoList,
@RequestHeader(SecurityConstants.FROM_SOURCE) String source
);
}
```
##### RemoteSkuStockFallbackFactory
```java
package com.spzx.product.api.factory;
public class RemoteSkuStockFallbackFactory implements FallbackFactory {
@Override
public RemoteSkuStockService create(Throwable cause) {
return new RemoteSkuStockService() {
@Override
public R checkAndLock(String orderNo, List skuLockVoList, String source) {
return R.fail("检查与锁定商品失败:" + cause.getMessage());
}
};
}
}
```
##### 加载配置类
org.springframework.boot.autoconfigure.AutoConfiguration.imports
```
com.spzx.product.api.fallback.RemoteSkuStockFallbackFactory
```
#### 1.1.3、下单接口改造
操作模块:`spzx-order`
##### OrderInfoServiceImpl
```java
@Autowired
private RemoteSkuStockService remoteSkuStockService;
@Transactional(rollbackFor = Exception.class)
@Override
public Long submitOrder(OrderForm orderForm) {
...
//3 校验库存并锁定库存
List skuLockVoList = orderItemList.stream().map(item -> {
SkuLockVo skuLockVo = new SkuLockVo();
skuLockVo.setSkuId(item.getSkuId());
skuLockVo.setSkuNum(item.getSkuNum());
return skuLockVo;
}).collect(Collectors.toList());
String checkAndLockResult = remoteSkuStockService.checkAndLock(
orderForm.getTradeNo(),
skuLockVoList,
SecurityConstants.INNER).getData();
if(StringUtils.isNotEmpty(checkAndLockResult)) {
throw new ServiceException(checkAndLockResult);
}
...
return orderId;
}
```
### 2.2 解锁库存
如果下单失败,则接口会抛出异常,那么我们需要将已锁定的库存进行解锁。
#### 2.2.1 下单接口改造
操作模块:`spzx-order`
##### OrderInfoServiceImpl
下单异常时需要解锁库存:
```java
Long orderId = null;
try {
//下单
orderId = this.saveOrder(orderForm);
} catch (Exception e) {
e.printStackTrace();
//下单失败,解锁库存
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderForm.getTradeNo());
//抛出异常
throw new ServiceException("下单失败");
}
```
取消订单时需要解锁库存:
```java
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId) {
OrderInfo orderInfo = baseMapper.selectById(orderId);
if(null != orderInfo && orderInfo.getOrderStatus().intValue() == 0) {
......
//发送MQ消息通知商品系统解锁库存:TODO
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderInfo.getOrderNo());
}
}
```
#### 2.2.2 远程调用接口
操作模块:`spzx-product`
##### 1 pom.xml
添加依赖
```xml
com.spzx
spzx-common-rabbit
3.6.3
```
##### 2 spzx-product-dev.yml
添加配置
```yaml
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED
publisher-returns: true
listener:
simple:
cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
```
##### 3 ProductReceiver
```java
package com.spzx.product.receiver;
@Slf4j
@Component
public class ProductReceiver {
@Autowired
private ISkuStockService skuStockService;
/**
* 解锁库存
* @param orderNo 订单号
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_PRODUCT, durable = "true"),
value = @Queue(value = MqConst.QUEUE_UNLOCK, durable = "true"),
key = {MqConst.ROUTING_UNLOCK}
))
public void unlock(String orderNo, Message message, Channel channel) {
//业务处理
log.info("[商品服务]监听解锁库存消息:{}", orderNo);
//解锁库存
skuStockService.unlock(orderNo);
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
```
##### 4 ISkuStockService
```java
void unlock(String orderNo);
```
##### 5 skuStockServiceImpl
```java
@Transactional(rollbackFor = {Exception.class})
@Override
public void unlock(String orderNo) {
//幂等性处理
String key = "sku:unlock:" + orderNo;
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, orderNo, 1, TimeUnit.HOURS);
if(!isFirst) return;
// 获取锁定库存的缓存信息
String dataKey = "sku:lock:data:" + orderNo;
List skuLockVoList = (List)this.redisTemplate.opsForValue().get(dataKey);
if (CollectionUtils.isEmpty(skuLockVoList)){
return;
}
// 解锁库存
skuLockVoList.forEach(skuLockVo -> {
int row = baseMapper.unlock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
});
// 解锁库存之后,删除锁定库存的缓存。
this.redisTemplate.delete(dataKey);
}
```
##### 6 SkuStockMapper
```java
Integer unlock(@Param("skuId") Long skuId, @Param("num")Integer num);
```
##### 7 SkuStockMapper.xml
```xml
update sku_stock
set lock_num = lock_num - #{num}, available_num = available_num + #{num}
where sku_id = #{skuId}
```