尚硅谷_尚品甄选_第7章_库存.md 22 KB

[TOC]

库存

1 RabbitMQ

1.1 搭建spzx-common-rabbit模块

由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可

1.1.1 新建模块

spzx-common模块下新建spzx-common-rabbit模块

1.1.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.spzx</groupId>
        <artifactId>spzx-common</artifactId>
        <version>3.6.3</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spzx-common-rabbit</artifactId>

    <description>
        spzx-common-rabbit服务
    </description>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--rabbitmq消息队列-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- 缓存服务 -->
        <dependency>
            <groupId>com.spzx</groupId>
            <artifactId>spzx-common-redis</artifactId>
        </dependency>
    </dependencies>
</project>

1.1.3 RabbitService

package com.spzx.common.rabbit.service;

public class RabbitService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param exchange   交换机
     * @param routingKey 路由键
     * @param message    消息
     */
    public boolean sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }

}

1.1.4 加载配置类

在resources目录中创建

META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

com.spzx.common.rabbit.service.RabbitService

1.1.5 MqConst

提供常量类 MqConst

package com.spzx.common.rabbit.constant;

public class MqConst {

	/**
     * 测试
     */
    public static final String EXCHANGE_TEST = "spzx.exchange.test";
    public static final String ROUTING_TEST = "spzx.routing.test";
    public static final String ROUTING_CONFIRM = "spzx.routing.confirm";
    //队列
    public static final String QUEUE_TEST  = "spzx.queue.test";
    public static final String QUEUE_CONFIRM  = "spzx.queue.confirm";

    /**
     * 库存
     */
    public static final String EXCHANGE_PRODUCT = "spzx.exchange.product";
    public static final String ROUTING_UNLOCK = "spzx.routing.unlock";
    public static final String ROUTING_MINUS = "spzx.routing.minus";
    //队列
    public static final String QUEUE_UNLOCK  = "spzx.queue.unlock";
    public static final String QUEUE_MINUS  = "spzx.queue.minus";

    /**
     * 支付
     */
    public static final String EXCHANGE_PAYMENT_PAY = "spzx.exchange.payment";
    public static final String ROUTING_PAYMENT_PAY = "spzx.routing.payment.pay";
    public static final String ROUTING_PAYMENT_CLOSE = "spzx.routing.payment.close";;
    public static final String QUEUE_PAYMENT_PAY = "spzx.queue.payment.pay";
    public static final String QUEUE_PAYMENT_CLOSE  = "spzx.queue.payment.close";


    /**
     * 取消订单延迟消息
     */
    public static final String EXCHANGE_CANCEL_ORDER = "spzx.exchange.cancel.order";
    public static final String ROUTING_CANCEL_ORDER = "spzx.routing.cancel.order";
    public static final String QUEUE_CANCEL_ORDER = "spzx.queue.cancel.order";
    public static final Integer CANCEL_ORDER_DELAY_TIME = 15 * 60;
}

1.2 RabbitMQ测试

我们在spzx-order模块测试mq消息

1.2.1 配置RabbitMQ

在nacos配置中心,spzx-order-dev.yml文件添加配置

spring:
  rabbitmq:
    host: 192.168.100.131
    port: 5672
    username: guest
    password: guest

说明:host改为实际的IP

1.2.2 引入spzx-common-rabbit模块

spzx-order模块pom.xml文件添加依赖

<dependency>
    <groupId>com.spzx</groupId>
    <artifactId>spzx-common-rabbit</artifactId>
    <version>3.6.3</version>
</dependency>

1.2.3 MqController

发送消息

package com.spzx.order.controller;

@Tag(name = "Mq接口管理")
@RestController
@RequestMapping("/mq")
public class MqController extends BaseController
{
    @Autowired
    private RabbitService rabbitService;

    @Operation(summary = "发送消息")
    @GetMapping("/sendMessage")
    public AjaxResult sendMessage()
    {
        rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_TEST, "hello");
        return success();
    }

}

1.2.4 TestReceiver

监听消息:启动spzx-order服务,即可看到exchange、queue以及他们之间的路由绑定关系已经创建出来了

package com.spzx.order.receiver;


@Slf4j
@Component
public class TestReceiver {
    /**
     * 监听消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_TEST, durable = "true"),
            key = MqConst.ROUTING_TEST
    ))
    public void test(String content, Message message) {
        //都可以
        log.info("接收消息:{}", content);
        log.info("接收消息:{}", new String(message.getBody()));
    }
}

1.2.5 knife4j测试

发送消息

70902075792

监听消息:查看idea打印结果

1.3 消息可靠性配置

1.3.1 介绍

MQ消息的可靠性,一般需要三个方面一起保证:

  1. 生产者不丢数据(可靠性投递)
  2. MQ服务器不丢数据(可靠性存储)
  3. 消费者不丢数据(可靠性消费)

1.3.2 消息发送确认配置

消息发送确认可以保证生产者不丢数据

1 封装发送端消息配置类

生产者不丢数据从两个方面保证:生产者确认机制、生产者退回机制

操作模块:spzx-common-rabbit

package com.spzx.common.rabbit.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class RabbitInitConfigApplicationListener implements ApplicationListener<ApplicationReadyEvent> {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        this.setupCallbacks();
    }

    private void setupCallbacks() {

		/**
         * 只确认消息是否正确到达 Exchange 中,成功与否都会回调
         *
         * @param correlation 相关数据  非消息本身业务数据
         * @param ack         应答结果
         * @param reason      如果发送消息到交换器失败,错误原因
         */
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, reason) -> {
            if (ack) {
                //消息到交换器成功
                log.info("消息发送到Exchange成功:{}", correlationData);
            } else {
                //消息到交换器失败
                log.error("消息发送到Exchange失败:{}", reason);
            }
        });

        /**
         * 消息没有正确到达队列时触发回调,如果正确到达队列不执行
         */
     	//默认情况下mandatory的值是false
        //但是只要配置开启了publisher-returns则mandatory的值就是true
        //this.rabbitTemplate.setMandatory(true);//是否让rabbitmq将失败的消息的信息再次返回给生产者
        this.rabbitTemplate.setReturnsCallback(returned -> {
            log.error("返回: " + returned.getMessage()
                      + "\n 响应码: " + returned.getReplyCode()
                      + "\n 响应消息: " + returned.getReplyText() 
                      + "\n 交换机: " + returned.getExchange() 
                      + "\n 路由: " + returned.getRoutingKey());
        });
    }
}
2 加载配置类

resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

com.spzx.common.rabbit.config.RabbitInitConfigApplicationListener
3 修改配置

在nacos配置中心,修改spzx-order-dev.yml配置

spring:
  rabbitmq:
    host: 192.168.100.131
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: CORRELATED #开启生产者确认机制
    publisher-returns: true #开启生产者退回机制 
    listener:
      simple:
        acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
4 MqController

发送确认消息

@Operation(summary = "发送确认消息")
@GetMapping("/sendConfirmMessage")
public AjaxResult sendConfirmMessage()
{
    rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_CONFIRM, "hello, confirm");
    return success();
}
5 TestReceiver

监听确认消息

/**
 * 监听确认消息
 * @param message
 */
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
        value = @Queue(value = MqConst.QUEUE_CONFIRM, durable = "true"),
        key = MqConst.ROUTING_CONFIRM
))
public void confirm(String content, Message message, Channel channel) {
    log.info("接收确认消息:{}", content);

    // false 确认一个消息,true 批量确认
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

2 库存接口

我们的商品不允许超卖,为了防止超卖,我们下单必须检查与锁定库存,下单失败或取消订单要解锁库存,支付成功扣减库存

2.1 下单检查与锁定库存

2.1.1 远程调用接口

SkuLockVo

操作模块:spzx-api-product

package com.spzx.product.api.domain;

@Data
public class SkuLockVo
{

    private Long skuId;

    private Integer skuNum;
    
    private String skuName;

    /** 是否有库存 **/
    private boolean isHaveStock = false;
	//private Boolean isHaveStock = false;//Boolean和boolean生成的getter、setter不一样

}

操作模块:spzx-product

SkuStockMapper
//校验sku库存
SkuStock check(@Param("skuId") Long skuId, @Param("num")Integer num);
//锁定sku库存
Integer lock(@Param("skuId") Long skuId, @Param("num")Integer num);
SkuStockMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.spzx.product.mapper.SkuStockMapper">
    <!--悲观锁进行库存锁定检查-->
    <select id="checkStock" resultType="com.spzx.product.domain.SkuStock">
        SELECT * from sku_stock where sku_id = #{skuId} and available_num >= #{skuNum} and del_flag = 0 for update
    </select>

    <!--库存锁定-->
    <update id="lockStock">
        UPDATE sku_stock set lock_num = lock_num + #{skuNum} , available_num = available_num - #{skuNum} where sku_id = #{skuId} and  del_flag = 0;
    </update>
</mapper>
SkuStockController
package com.spzx.product.controller;

import com.spzx.common.core.domain.R;
import com.spzx.common.core.web.controller.BaseController;
import com.spzx.common.security.annotation.InnerAuth;
import com.spzx.product.service.ISkuStockService;
import com.spzx.product.vo.SkuLockVo;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
 * <p>
 * 商品sku库存表 前端控制器
 * </p>
 *
 * @author atguigu
 * @since 2025-04-28
 */
@RestController
@RequestMapping("/skuStock")
public class SkuStockController extends BaseController {

    @Autowired
    private ISkuStockService skuStockService;


    /**
     * 检查与锁定库存,要求不能出现库存超卖
     * @param orderNo
     * @param skuLockVoList
     * @return
     */
    @InnerAuth
    @Operation(summary = "检查与锁定库存")
    @PostMapping("/checkAndLock/{orderNo}")
    public R<String> checkAndLock(@PathVariable String orderNo, @RequestBody List<SkuLockVo> skuLockVoList) {
        String stockErrorMsg = skuStockService.checkAndLock(orderNo, skuLockVoList);
        return R.ok(stockErrorMsg);
    }

}
ISkuStockService
/**
 * 检查并锁定库存,避免库存“超卖”
 * @param orderNo
 * @param skuLockVoList
 * @return
 */
String checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList);
SkuStockServiceImpl
@Autowired
private RedisTemplate redisTemplate;

/**
 * 检查并锁定库存,避免库存“超卖”
 *
 * @param orderNo
 * @param skuLockVoList
 * @return
 */
@Override
@Transactional(rollbackFor = Exception.class)
public String checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList) {
    //1.遍历商品扣减VO列表,检查是否满足锁定库存条件,为VO中属性是否有库存赋值
    for (SkuLockVo skuLockVo : skuLockVoList) {
        //1.1 判断是否满足锁定库存,采用MySQL的悲观锁来进行锁定:select语句where条件+for update
        SkuStock skuStock = baseMapper.checkStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
        //1.2 根据select悲观锁结果来判断是否有库存
        if (skuStock != null) {
            //有库存就将VO的属性设置为true
            skuLockVo.setHaveStock(true);
        } else {
            //没有库存,将VO的属性设置为false
            skuLockVo.setHaveStock(false);
        }
    }
    //2.如果有任意一件商品无法锁定,则锁定失败,封装锁定库存商品提示信息
    StringBuilder stockErrorMsg = new StringBuilder("");
    boolean flag = skuLockVoList.stream().anyMatch(skuLockVo -> !skuLockVo.isHaveStock());
    if (flag) {
        //2.1遍历商品库存锁定VO列表 找出所有没有锁定成功的商品
        for (SkuLockVo skuLockVo : skuLockVoList) {
            if (!skuLockVo.isHaveStock()) {
                //2.2 查询当前库存不足商品实时可用库存数量
                SkuStock skuStock = baseMapper.selectOne(
                        new LambdaQueryWrapper<SkuStock>().eq(SkuStock::getSkuId, skuLockVo.getSkuId())
                );
                //2.3 拼接错误提示信息
                stockErrorMsg.append(skuLockVo.getSkuName())
                        .append("库存不足,当前剩余:")
                        .append(skuStock.getAvailableNum())
                        .append(";");
            }
        }
    }
    if (StringUtils.isNotBlank(stockErrorMsg.toString())) {
        return stockErrorMsg.toString();
    }
    //3.如果全部商品可以锁定成功,则进行商品库存锁定
    for (SkuLockVo skuLockVo : skuLockVoList) {
        baseMapper.lockStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
    }

    //4.将商品锁定库存信息存在Redis-用于取消订单解锁库存/订单支付成功最终库存扣减
    String dataKey = "sku:lock:data:" + orderNo;
    redisTemplate.opsForValue().set(dataKey, skuLockVoList);
    return null;
}

1.1.2、openFeign接口定义

操作模块:spzx-api-product

RemoteSkuStockService
@FeignClient(
        contextId = "remoteSkuStockService",
        value = ServiceNameConstants.PRODUCT_SERVICE,
        fallbackFactory = RemoteSkuStockFallbackFactory.class
)
public interface RemoteSkuStockService {

    @PostMapping("/skuStock/checkAndLock/{orderNo}")
    R<String> checkAndLock(
            @PathVariable("orderNo") String orderNo,
            @RequestBody List<SkuLockVo> skuLockVoList,
            @RequestHeader(SecurityConstants.FROM_SOURCE) String source
    );
}
RemoteSkuStockFallbackFactory
package com.spzx.product.api.factory;

public class RemoteSkuStockFallbackFactory implements FallbackFactory<RemoteSkuStockService> {

    @Override
    public RemoteSkuStockService create(Throwable cause) {
        return new RemoteSkuStockService() {
            @Override
            public R<String> checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList, String source) {
                return R.fail("检查与锁定商品失败:" + cause.getMessage());
            }
        };
    }
}
加载配置类

org.springframework.boot.autoconfigure.AutoConfiguration.imports

com.spzx.product.api.fallback.RemoteSkuStockFallbackFactory

1.1.3、下单接口改造

操作模块:spzx-order

OrderInfoServiceImpl
@Autowired
private RemoteSkuStockService remoteSkuStockService;

@Transactional(rollbackFor = Exception.class)
@Override
public Long submitOrder(OrderForm orderForm) {
    
    ...

    //3 校验库存并锁定库存
    List<SkuLockVo> skuLockVoList = orderItemList.stream().map(item -> {
        SkuLockVo skuLockVo = new SkuLockVo();
        skuLockVo.setSkuId(item.getSkuId());
        skuLockVo.setSkuNum(item.getSkuNum());
        return skuLockVo;
    }).collect(Collectors.toList());
    String checkAndLockResult = remoteSkuStockService.checkAndLock(
        orderForm.getTradeNo(), 
        skuLockVoList, 
        SecurityConstants.INNER).getData();
    if(StringUtils.isNotEmpty(checkAndLockResult)) {
        throw new ServiceException(checkAndLockResult);
    }

    
    ...
        
    return orderId;
}

2.2 解锁库存

如果下单失败,则接口会抛出异常,那么我们需要将已锁定的库存进行解锁。

2.2.1 下单接口改造

操作模块:spzx-order

OrderInfoServiceImpl

下单异常时需要解锁库存:

Long orderId = null;
try {
    //下单
    orderId = this.saveOrder(orderForm);
} catch (Exception e) {
    e.printStackTrace();
    //下单失败,解锁库存
    rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderForm.getTradeNo());
    //抛出异常
    throw new ServiceException("下单失败");
}

取消订单时需要解锁库存:

@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId) {
    OrderInfo orderInfo = baseMapper.selectById(orderId);
    if(null != orderInfo && orderInfo.getOrderStatus().intValue() == 0) {
        ......
            
        //发送MQ消息通知商品系统解锁库存:TODO
        rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderInfo.getOrderNo());
    }
}

2.2.2 远程调用接口

操作模块:spzx-product

1 pom.xml

添加依赖

<dependency>
    <groupId>com.spzx</groupId>
    <artifactId>spzx-common-rabbit</artifactId>
    <version>3.6.3</version>
</dependency>
2 spzx-product-dev.yml

添加配置

spring:
  rabbitmq:
    host: 192.168.100.131
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: CORRELATED
    publisher-returns: true
    listener:
      simple:
        cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
3 ProductReceiver
package com.spzx.product.receiver;

@Slf4j
@Component
public class ProductReceiver {

    @Autowired
    private ISkuStockService skuStockService;

    /**
     * 解锁库存
     * @param orderNo 订单号
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_PRODUCT, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_UNLOCK, durable = "true"),
            key = {MqConst.ROUTING_UNLOCK}
    ))
    public void unlock(String orderNo, Message message, Channel channel) {
        //业务处理
        log.info("[商品服务]监听解锁库存消息:{}", orderNo);
        //解锁库存
        skuStockService.unlock(orderNo);
        
        //手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}
4 ISkuStockService
void unlock(String orderNo);
5 skuStockServiceImpl
@Transactional(rollbackFor = {Exception.class})
@Override
public void unlock(String orderNo) {

    //幂等性处理
    String key = "sku:unlock:" + orderNo;
    Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, orderNo, 1, TimeUnit.HOURS);
    if(!isFirst) return;

    // 获取锁定库存的缓存信息
    String dataKey = "sku:lock:data:" + orderNo;
    List<SkuLockVo> skuLockVoList = (List<SkuLockVo>)this.redisTemplate.opsForValue().get(dataKey);
    if (CollectionUtils.isEmpty(skuLockVoList)){
        return;
    }

    // 解锁库存
    skuLockVoList.forEach(skuLockVo -> {
        int row = baseMapper.unlock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
    });

    // 解锁库存之后,删除锁定库存的缓存。
    this.redisTemplate.delete(dataKey);
}
6 SkuStockMapper
Integer unlock(@Param("skuId") Long skuId, @Param("num")Integer num);
7 SkuStockMapper.xml
<update id="unlock">
    update sku_stock
    set lock_num = lock_num - #{num}, available_num = available_num + #{num}
    where sku_id = #{skuId}
</update>