[TOC] # 库存 ## 1 RabbitMQ ### 1.1 搭建spzx-common-rabbit模块 由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可 #### 1.1.1 新建模块 在`spzx-common`模块下新建`spzx-common-rabbit`模块 #### 1.1.2 pom.xml ```xml com.spzx spzx-common 3.6.3 4.0.0 spzx-common-rabbit spzx-common-rabbit服务 org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-bus-amqp com.alibaba.fastjson2 fastjson2 org.projectlombok lombok com.spzx spzx-common-redis ``` #### 1.1.3 RabbitService ```java package com.spzx.common.rabbit.service; public class RabbitService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * * @param exchange 交换机 * @param routingKey 路由键 * @param message 消息 */ public boolean sendMessage(String exchange, String routingKey, Object message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); return true; } } ``` #### 1.1.4 加载配置类 在resources目录中创建 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports ```java com.spzx.common.rabbit.service.RabbitService ``` #### 1.1.5 MqConst 提供常量类 MqConst ```java package com.spzx.common.rabbit.constant; public class MqConst { /** * 测试 */ public static final String EXCHANGE_TEST = "spzx.exchange.test"; public static final String ROUTING_TEST = "spzx.routing.test"; public static final String ROUTING_CONFIRM = "spzx.routing.confirm"; //队列 public static final String QUEUE_TEST = "spzx.queue.test"; public static final String QUEUE_CONFIRM = "spzx.queue.confirm"; /** * 库存 */ public static final String EXCHANGE_PRODUCT = "spzx.exchange.product"; public static final String ROUTING_UNLOCK = "spzx.routing.unlock"; public static final String ROUTING_MINUS = "spzx.routing.minus"; //队列 public static final String QUEUE_UNLOCK = "spzx.queue.unlock"; public static final String QUEUE_MINUS = "spzx.queue.minus"; /** * 支付 */ public static final String EXCHANGE_PAYMENT_PAY = "spzx.exchange.payment"; public static final String ROUTING_PAYMENT_PAY = "spzx.routing.payment.pay"; public static final String ROUTING_PAYMENT_CLOSE = "spzx.routing.payment.close";; public static final String QUEUE_PAYMENT_PAY = "spzx.queue.payment.pay"; public static final String QUEUE_PAYMENT_CLOSE = "spzx.queue.payment.close"; /** * 取消订单延迟消息 */ public static final String EXCHANGE_CANCEL_ORDER = "spzx.exchange.cancel.order"; public static final String ROUTING_CANCEL_ORDER = "spzx.routing.cancel.order"; public static final String QUEUE_CANCEL_ORDER = "spzx.queue.cancel.order"; public static final Integer CANCEL_ORDER_DELAY_TIME = 15 * 60; } ``` ### 1.2 RabbitMQ测试 我们在`spzx-order`模块测试mq消息 #### 1.2.1 配置RabbitMQ 在nacos配置中心,spzx-order-dev.yml文件添加配置 ```yaml spring: rabbitmq: host: 192.168.100.131 port: 5672 username: guest password: guest ``` 说明:host改为实际的IP #### 1.2.2 引入spzx-common-rabbit模块 在`spzx-order`模块pom.xml文件添加依赖 ```xml com.spzx spzx-common-rabbit 3.6.3 ``` #### 1.2.3 MqController 发送消息 ```java package com.spzx.order.controller; @Tag(name = "Mq接口管理") @RestController @RequestMapping("/mq") public class MqController extends BaseController { @Autowired private RabbitService rabbitService; @Operation(summary = "发送消息") @GetMapping("/sendMessage") public AjaxResult sendMessage() { rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_TEST, "hello"); return success(); } } ``` #### 1.2.4 TestReceiver 监听消息:启动spzx-order服务,即可看到exchange、queue以及他们之间的路由绑定关系已经创建出来了 ```java package com.spzx.order.receiver; @Slf4j @Component public class TestReceiver { /** * 监听消息 * @param message */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"), value = @Queue(value = MqConst.QUEUE_TEST, durable = "true"), key = MqConst.ROUTING_TEST )) public void test(String content, Message message) { //都可以 log.info("接收消息:{}", content); log.info("接收消息:{}", new String(message.getBody())); } } ``` #### 1.2.5 knife4j测试 发送消息 ![70902075792](images/1709020757929-17320301769801.png) 监听消息:查看idea打印结果 ### 1.3 消息可靠性配置 #### 1.3.1 介绍 MQ消息的可靠性,一般需要三个方面一起保证: 1. 生产者不丢数据(可靠性投递) 2. MQ服务器不丢数据(可靠性存储) 3. 消费者不丢数据(可靠性消费) #### 1.3.2 消息发送确认配置 消息发送确认可以保证生产者不丢数据 ##### 1 封装发送端消息配置类 生产者不丢数据从两个方面保证:**生产者确认机制、生产者退回机制** 操作模块:`spzx-common-rabbit` ```java package com.spzx.common.rabbit.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitInitConfigApplicationListener implements ApplicationListener { @Autowired private RabbitTemplate rabbitTemplate; @Override public void onApplicationEvent(ApplicationReadyEvent event) { this.setupCallbacks(); } private void setupCallbacks() { /** * 只确认消息是否正确到达 Exchange 中,成功与否都会回调 * * @param correlation 相关数据 非消息本身业务数据 * @param ack 应答结果 * @param reason 如果发送消息到交换器失败,错误原因 */ this.rabbitTemplate.setConfirmCallback((correlationData, ack, reason) -> { if (ack) { //消息到交换器成功 log.info("消息发送到Exchange成功:{}", correlationData); } else { //消息到交换器失败 log.error("消息发送到Exchange失败:{}", reason); } }); /** * 消息没有正确到达队列时触发回调,如果正确到达队列不执行 */ //默认情况下mandatory的值是false //但是只要配置开启了publisher-returns则mandatory的值就是true //this.rabbitTemplate.setMandatory(true);//是否让rabbitmq将失败的消息的信息再次返回给生产者 this.rabbitTemplate.setReturnsCallback(returned -> { log.error("返回: " + returned.getMessage() + "\n 响应码: " + returned.getReplyCode() + "\n 响应消息: " + returned.getReplyText() + "\n 交换机: " + returned.getExchange() + "\n 路由: " + returned.getRoutingKey()); }); } } ``` ##### 2 加载配置类 resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports ```java com.spzx.common.rabbit.config.RabbitInitConfigApplicationListener ``` ##### 3 修改配置 在nacos配置中心,修改spzx-order-dev.yml配置 ```yaml spring: rabbitmq: host: 192.168.100.131 port: 5672 username: guest password: guest publisher-confirm-type: CORRELATED #开启生产者确认机制 publisher-returns: true #开启生产者退回机制 listener: simple: acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发 ``` ##### 4 MqController 发送确认消息 ```java @Operation(summary = "发送确认消息") @GetMapping("/sendConfirmMessage") public AjaxResult sendConfirmMessage() { rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_CONFIRM, "hello, confirm"); return success(); } ``` ##### 5 TestReceiver 监听确认消息 ```java /** * 监听确认消息 * @param message */ @SneakyThrows @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"), value = @Queue(value = MqConst.QUEUE_CONFIRM, durable = "true"), key = MqConst.ROUTING_CONFIRM )) public void confirm(String content, Message message, Channel channel) { log.info("接收确认消息:{}", content); // false 确认一个消息,true 批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` ## 2 库存接口 我们的商品不允许超卖,为了防止超卖,我们下单必须检查与锁定库存,下单失败或取消订单要解锁库存,支付成功扣减库存 ### 2.1 下单检查与锁定库存 #### 2.1.1 远程调用接口 ##### SkuLockVo 操作模块:spzx-api-product ```java package com.spzx.product.api.domain; @Data public class SkuLockVo { private Long skuId; private Integer skuNum; private String skuName; /** 是否有库存 **/ private boolean isHaveStock = false; //private Boolean isHaveStock = false;//Boolean和boolean生成的getter、setter不一样 } ``` 操作模块:spzx-product ##### SkuStockMapper ```java //校验sku库存 SkuStock check(@Param("skuId") Long skuId, @Param("num")Integer num); //锁定sku库存 Integer lock(@Param("skuId") Long skuId, @Param("num")Integer num); ``` ##### SkuStockMapper.xml ```xml UPDATE sku_stock set lock_num = lock_num + #{skuNum} , available_num = available_num - #{skuNum} where sku_id = #{skuId} and del_flag = 0; ``` ##### SkuStockController ```java package com.spzx.product.controller; import com.spzx.common.core.domain.R; import com.spzx.common.core.web.controller.BaseController; import com.spzx.common.security.annotation.InnerAuth; import com.spzx.product.service.ISkuStockService; import com.spzx.product.vo.SkuLockVo; import io.swagger.v3.oas.annotations.Operation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.List; /** *

* 商品sku库存表 前端控制器 *

* * @author atguigu * @since 2025-04-28 */ @RestController @RequestMapping("/skuStock") public class SkuStockController extends BaseController { @Autowired private ISkuStockService skuStockService; /** * 检查与锁定库存,要求不能出现库存超卖 * @param orderNo * @param skuLockVoList * @return */ @InnerAuth @Operation(summary = "检查与锁定库存") @PostMapping("/checkAndLock/{orderNo}") public R checkAndLock(@PathVariable String orderNo, @RequestBody List skuLockVoList) { String stockErrorMsg = skuStockService.checkAndLock(orderNo, skuLockVoList); return R.ok(stockErrorMsg); } } ``` ##### ISkuStockService ```java /** * 检查并锁定库存,避免库存“超卖” * @param orderNo * @param skuLockVoList * @return */ String checkAndLock(String orderNo, List skuLockVoList); ``` ##### SkuStockServiceImpl ```java @Autowired private RedisTemplate redisTemplate; /** * 检查并锁定库存,避免库存“超卖” * * @param orderNo * @param skuLockVoList * @return */ @Override @Transactional(rollbackFor = Exception.class) public String checkAndLock(String orderNo, List skuLockVoList) { //1.遍历商品扣减VO列表,检查是否满足锁定库存条件,为VO中属性是否有库存赋值 for (SkuLockVo skuLockVo : skuLockVoList) { //1.1 判断是否满足锁定库存,采用MySQL的悲观锁来进行锁定:select语句where条件+for update SkuStock skuStock = baseMapper.checkStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum()); //1.2 根据select悲观锁结果来判断是否有库存 if (skuStock != null) { //有库存就将VO的属性设置为true skuLockVo.setHaveStock(true); } else { //没有库存,将VO的属性设置为false skuLockVo.setHaveStock(false); } } //2.如果有任意一件商品无法锁定,则锁定失败,封装锁定库存商品提示信息 StringBuilder stockErrorMsg = new StringBuilder(""); boolean flag = skuLockVoList.stream().anyMatch(skuLockVo -> !skuLockVo.isHaveStock()); if (flag) { //2.1遍历商品库存锁定VO列表 找出所有没有锁定成功的商品 for (SkuLockVo skuLockVo : skuLockVoList) { if (!skuLockVo.isHaveStock()) { //2.2 查询当前库存不足商品实时可用库存数量 SkuStock skuStock = baseMapper.selectOne( new LambdaQueryWrapper().eq(SkuStock::getSkuId, skuLockVo.getSkuId()) ); //2.3 拼接错误提示信息 stockErrorMsg.append(skuLockVo.getSkuName()) .append("库存不足,当前剩余:") .append(skuStock.getAvailableNum()) .append(";"); } } } if (StringUtils.isNotBlank(stockErrorMsg.toString())) { return stockErrorMsg.toString(); } //3.如果全部商品可以锁定成功,则进行商品库存锁定 for (SkuLockVo skuLockVo : skuLockVoList) { baseMapper.lockStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum()); } //4.将商品锁定库存信息存在Redis-用于取消订单解锁库存/订单支付成功最终库存扣减 String dataKey = "sku:lock:data:" + orderNo; redisTemplate.opsForValue().set(dataKey, skuLockVoList); return null; } ``` #### 1.1.2、openFeign接口定义 操作模块:spzx-api-product ##### RemoteSkuStockService ```java @FeignClient( contextId = "remoteSkuStockService", value = ServiceNameConstants.PRODUCT_SERVICE, fallbackFactory = RemoteSkuStockFallbackFactory.class ) public interface RemoteSkuStockService { @PostMapping("/skuStock/checkAndLock/{orderNo}") R checkAndLock( @PathVariable("orderNo") String orderNo, @RequestBody List skuLockVoList, @RequestHeader(SecurityConstants.FROM_SOURCE) String source ); } ``` ##### RemoteSkuStockFallbackFactory ```java package com.spzx.product.api.factory; public class RemoteSkuStockFallbackFactory implements FallbackFactory { @Override public RemoteSkuStockService create(Throwable cause) { return new RemoteSkuStockService() { @Override public R checkAndLock(String orderNo, List skuLockVoList, String source) { return R.fail("检查与锁定商品失败:" + cause.getMessage()); } }; } } ``` ##### 加载配置类 org.springframework.boot.autoconfigure.AutoConfiguration.imports ``` com.spzx.product.api.fallback.RemoteSkuStockFallbackFactory ``` #### 1.1.3、下单接口改造 操作模块:`spzx-order` ##### OrderInfoServiceImpl ```java @Autowired private RemoteSkuStockService remoteSkuStockService; @Transactional(rollbackFor = Exception.class) @Override public Long submitOrder(OrderForm orderForm) { ... //3 校验库存并锁定库存 List skuLockVoList = orderItemList.stream().map(item -> { SkuLockVo skuLockVo = new SkuLockVo(); skuLockVo.setSkuId(item.getSkuId()); skuLockVo.setSkuNum(item.getSkuNum()); return skuLockVo; }).collect(Collectors.toList()); String checkAndLockResult = remoteSkuStockService.checkAndLock( orderForm.getTradeNo(), skuLockVoList, SecurityConstants.INNER).getData(); if(StringUtils.isNotEmpty(checkAndLockResult)) { throw new ServiceException(checkAndLockResult); } ... return orderId; } ``` ### 2.2 解锁库存 如果下单失败,则接口会抛出异常,那么我们需要将已锁定的库存进行解锁。 #### 2.2.1 下单接口改造 操作模块:`spzx-order` ##### OrderInfoServiceImpl 下单异常时需要解锁库存: ```java Long orderId = null; try { //下单 orderId = this.saveOrder(orderForm); } catch (Exception e) { e.printStackTrace(); //下单失败,解锁库存 rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderForm.getTradeNo()); //抛出异常 throw new ServiceException("下单失败"); } ``` 取消订单时需要解锁库存: ```java @Override @Transactional(rollbackFor = Exception.class) public void cancelOrder(Long orderId) { OrderInfo orderInfo = baseMapper.selectById(orderId); if(null != orderInfo && orderInfo.getOrderStatus().intValue() == 0) { ...... //发送MQ消息通知商品系统解锁库存:TODO rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderInfo.getOrderNo()); } } ``` #### 2.2.2 远程调用接口 操作模块:`spzx-product` ##### 1 pom.xml 添加依赖 ```xml com.spzx spzx-common-rabbit 3.6.3 ``` ##### 2 spzx-product-dev.yml 添加配置 ```yaml spring: rabbitmq: host: 192.168.100.131 port: 5672 username: guest password: guest publisher-confirm-type: CORRELATED publisher-returns: true listener: simple: cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发 ``` ##### 3 ProductReceiver ```java package com.spzx.product.receiver; @Slf4j @Component public class ProductReceiver { @Autowired private ISkuStockService skuStockService; /** * 解锁库存 * @param orderNo 订单号 */ @SneakyThrows @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MqConst.EXCHANGE_PRODUCT, durable = "true"), value = @Queue(value = MqConst.QUEUE_UNLOCK, durable = "true"), key = {MqConst.ROUTING_UNLOCK} )) public void unlock(String orderNo, Message message, Channel channel) { //业务处理 log.info("[商品服务]监听解锁库存消息:{}", orderNo); //解锁库存 skuStockService.unlock(orderNo); //手动应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ``` ##### 4 ISkuStockService ```java void unlock(String orderNo); ``` ##### 5 skuStockServiceImpl ```java @Transactional(rollbackFor = {Exception.class}) @Override public void unlock(String orderNo) { //幂等性处理 String key = "sku:unlock:" + orderNo; Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, orderNo, 1, TimeUnit.HOURS); if(!isFirst) return; // 获取锁定库存的缓存信息 String dataKey = "sku:lock:data:" + orderNo; List skuLockVoList = (List)this.redisTemplate.opsForValue().get(dataKey); if (CollectionUtils.isEmpty(skuLockVoList)){ return; } // 解锁库存 skuLockVoList.forEach(skuLockVo -> { int row = baseMapper.unlock(skuLockVo.getSkuId(), skuLockVo.getSkuNum()); }); // 解锁库存之后,删除锁定库存的缓存。 this.redisTemplate.delete(dataKey); } ``` ##### 6 SkuStockMapper ```java Integer unlock(@Param("skuId") Long skuId, @Param("num")Integer num); ``` ##### 7 SkuStockMapper.xml ```xml update sku_stock set lock_num = lock_num - #{num}, available_num = available_num + #{num} where sku_id = #{skuId} ```