[TOC]
由于消息队列是公共模块,我们把mq的相关代码(生产者)封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
在spzx-common
模块下新建spzx-common-rabbit
模块
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.spzx</groupId>
<artifactId>spzx-common</artifactId>
<version>3.6.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spzx-common-rabbit</artifactId>
<description>
spzx-common-rabbit服务
</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 缓存服务 -->
<dependency>
<groupId>com.spzx</groupId>
<artifactId>spzx-common-redis</artifactId>
</dependency>
</dependencies>
</project>
package com.spzx.common.rabbit.service;
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param message 消息
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
在resources目录中创建
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
com.spzx.common.rabbit.service.RabbitService
提供常量类 MqConst
package com.spzx.common.rabbit.constant;
public class MqConst {
/**
* 测试
*/
public static final String EXCHANGE_TEST = "spzx.exchange.test";
public static final String ROUTING_TEST = "spzx.routing.test";
public static final String ROUTING_CONFIRM = "spzx.routing.confirm";
//队列
public static final String QUEUE_TEST = "spzx.queue.test";
public static final String QUEUE_CONFIRM = "spzx.queue.confirm";
/**
* 库存
*/
public static final String EXCHANGE_PRODUCT = "spzx.exchange.product";
public static final String ROUTING_UNLOCK = "spzx.routing.unlock";
public static final String ROUTING_MINUS = "spzx.routing.minus";
//队列
public static final String QUEUE_UNLOCK = "spzx.queue.unlock";
public static final String QUEUE_MINUS = "spzx.queue.minus";
/**
* 支付
*/
public static final String EXCHANGE_PAYMENT_PAY = "spzx.exchange.payment";
public static final String ROUTING_PAYMENT_PAY = "spzx.routing.payment.pay";
public static final String ROUTING_PAYMENT_CLOSE = "spzx.routing.payment.close";;
public static final String QUEUE_PAYMENT_PAY = "spzx.queue.payment.pay";
public static final String QUEUE_PAYMENT_CLOSE = "spzx.queue.payment.close";
/**
* 取消订单延迟消息
*/
public static final String EXCHANGE_CANCEL_ORDER = "spzx.exchange.cancel.order";
public static final String ROUTING_CANCEL_ORDER = "spzx.routing.cancel.order";
public static final String QUEUE_CANCEL_ORDER = "spzx.queue.cancel.order";
public static final Integer CANCEL_ORDER_DELAY_TIME = 15 * 60;
}
我们在spzx-order
模块测试mq消息
在nacos配置中心,spzx-order-dev.yml文件添加配置
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
说明:host改为实际的IP
在spzx-order
模块pom.xml文件添加依赖
<dependency>
<groupId>com.spzx</groupId>
<artifactId>spzx-common-rabbit</artifactId>
<version>3.6.3</version>
</dependency>
发送消息
package com.spzx.order.controller;
@Tag(name = "Mq接口管理")
@RestController
@RequestMapping("/mq")
public class MqController extends BaseController
{
@Autowired
private RabbitService rabbitService;
@Operation(summary = "发送消息")
@GetMapping("/sendMessage")
public AjaxResult sendMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_TEST, "hello");
return success();
}
}
监听消息:启动spzx-order服务,即可看到exchange、queue以及他们之间的路由绑定关系已经创建出来了
package com.spzx.order.receiver;
@Slf4j
@Component
public class TestReceiver {
/**
* 监听消息
* @param message
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_TEST, durable = "true"),
key = MqConst.ROUTING_TEST
))
public void test(String content, Message message) {
//都可以
log.info("接收消息:{}", content);
log.info("接收消息:{}", new String(message.getBody()));
}
}
发送消息
监听消息:查看idea打印结果
MQ消息的可靠性,一般需要三个方面一起保证:
消息发送确认可以保证生产者不丢数据
生产者不丢数据从两个方面保证:生产者确认机制、生产者退回机制
操作模块:spzx-common-rabbit
package com.spzx.common.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitInitConfigApplicationListener implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.setupCallbacks();
}
private void setupCallbacks() {
/**
* 只确认消息是否正确到达 Exchange 中,成功与否都会回调
*
* @param correlation 相关数据 非消息本身业务数据
* @param ack 应答结果
* @param reason 如果发送消息到交换器失败,错误原因
*/
this.rabbitTemplate.setConfirmCallback((correlationData, ack, reason) -> {
if (ack) {
//消息到交换器成功
log.info("消息发送到Exchange成功:{}", correlationData);
} else {
//消息到交换器失败
log.error("消息发送到Exchange失败:{}", reason);
}
});
/**
* 消息没有正确到达队列时触发回调,如果正确到达队列不执行
*/
//默认情况下mandatory的值是false
//但是只要配置开启了publisher-returns则mandatory的值就是true
//this.rabbitTemplate.setMandatory(true);//是否让rabbitmq将失败的消息的信息再次返回给生产者
this.rabbitTemplate.setReturnsCallback(returned -> {
log.error("返回: " + returned.getMessage()
+ "\n 响应码: " + returned.getReplyCode()
+ "\n 响应消息: " + returned.getReplyText()
+ "\n 交换机: " + returned.getExchange()
+ "\n 路由: " + returned.getRoutingKey());
});
}
}
resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
com.spzx.common.rabbit.config.RabbitInitConfigApplicationListener
在nacos配置中心,修改spzx-order-dev.yml配置
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #开启生产者确认机制
publisher-returns: true #开启生产者退回机制
listener:
simple:
acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
发送确认消息
@Operation(summary = "发送确认消息")
@GetMapping("/sendConfirmMessage")
public AjaxResult sendConfirmMessage()
{
rabbitService.sendMessage(MqConst.EXCHANGE_TEST, MqConst.ROUTING_CONFIRM, "hello, confirm");
return success();
}
监听确认消息
/**
* 监听确认消息
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_TEST, durable = "true"),
value = @Queue(value = MqConst.QUEUE_CONFIRM, durable = "true"),
key = MqConst.ROUTING_CONFIRM
))
public void confirm(String content, Message message, Channel channel) {
log.info("接收确认消息:{}", content);
// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
我们的商品不允许超卖,为了防止超卖,我们下单必须检查与锁定库存,下单失败或取消订单要解锁库存,支付成功扣减库存
操作模块:spzx-api-product
package com.spzx.product.api.domain;
@Data
public class SkuLockVo
{
private Long skuId;
private Integer skuNum;
private String skuName;
/** 是否有库存 **/
private boolean isHaveStock = false;
//private Boolean isHaveStock = false;//Boolean和boolean生成的getter、setter不一样
}
操作模块:spzx-product
//校验sku库存
SkuStock check(@Param("skuId") Long skuId, @Param("num")Integer num);
//锁定sku库存
Integer lock(@Param("skuId") Long skuId, @Param("num")Integer num);
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.spzx.product.mapper.SkuStockMapper">
<!--悲观锁进行库存锁定检查-->
<select id="checkStock" resultType="com.spzx.product.domain.SkuStock">
SELECT * from sku_stock where sku_id = #{skuId} and available_num >= #{skuNum} and del_flag = 0 for update
</select>
<!--库存锁定-->
<update id="lockStock">
UPDATE sku_stock set lock_num = lock_num + #{skuNum} , available_num = available_num - #{skuNum} where sku_id = #{skuId} and del_flag = 0;
</update>
</mapper>
package com.spzx.product.controller;
import com.spzx.common.core.domain.R;
import com.spzx.common.core.web.controller.BaseController;
import com.spzx.common.security.annotation.InnerAuth;
import com.spzx.product.service.ISkuStockService;
import com.spzx.product.vo.SkuLockVo;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* <p>
* 商品sku库存表 前端控制器
* </p>
*
* @author atguigu
* @since 2025-04-28
*/
@RestController
@RequestMapping("/skuStock")
public class SkuStockController extends BaseController {
@Autowired
private ISkuStockService skuStockService;
/**
* 检查与锁定库存,要求不能出现库存超卖
* @param orderNo
* @param skuLockVoList
* @return
*/
@InnerAuth
@Operation(summary = "检查与锁定库存")
@PostMapping("/checkAndLock/{orderNo}")
public R<String> checkAndLock(@PathVariable String orderNo, @RequestBody List<SkuLockVo> skuLockVoList) {
String stockErrorMsg = skuStockService.checkAndLock(orderNo, skuLockVoList);
return R.ok(stockErrorMsg);
}
}
/**
* 检查并锁定库存,避免库存“超卖”
* @param orderNo
* @param skuLockVoList
* @return
*/
String checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList);
@Autowired
private RedisTemplate redisTemplate;
/**
* 检查并锁定库存,避免库存“超卖”
*
* @param orderNo
* @param skuLockVoList
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public String checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList) {
//1.遍历商品扣减VO列表,检查是否满足锁定库存条件,为VO中属性是否有库存赋值
for (SkuLockVo skuLockVo : skuLockVoList) {
//1.1 判断是否满足锁定库存,采用MySQL的悲观锁来进行锁定:select语句where条件+for update
SkuStock skuStock = baseMapper.checkStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
//1.2 根据select悲观锁结果来判断是否有库存
if (skuStock != null) {
//有库存就将VO的属性设置为true
skuLockVo.setHaveStock(true);
} else {
//没有库存,将VO的属性设置为false
skuLockVo.setHaveStock(false);
}
}
//2.如果有任意一件商品无法锁定,则锁定失败,封装锁定库存商品提示信息
StringBuilder stockErrorMsg = new StringBuilder("");
boolean flag = skuLockVoList.stream().anyMatch(skuLockVo -> !skuLockVo.isHaveStock());
if (flag) {
//2.1遍历商品库存锁定VO列表 找出所有没有锁定成功的商品
for (SkuLockVo skuLockVo : skuLockVoList) {
if (!skuLockVo.isHaveStock()) {
//2.2 查询当前库存不足商品实时可用库存数量
SkuStock skuStock = baseMapper.selectOne(
new LambdaQueryWrapper<SkuStock>().eq(SkuStock::getSkuId, skuLockVo.getSkuId())
);
//2.3 拼接错误提示信息
stockErrorMsg.append(skuLockVo.getSkuName())
.append("库存不足,当前剩余:")
.append(skuStock.getAvailableNum())
.append(";");
}
}
}
if (StringUtils.isNotBlank(stockErrorMsg.toString())) {
return stockErrorMsg.toString();
}
//3.如果全部商品可以锁定成功,则进行商品库存锁定
for (SkuLockVo skuLockVo : skuLockVoList) {
baseMapper.lockStock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
}
//4.将商品锁定库存信息存在Redis-用于取消订单解锁库存/订单支付成功最终库存扣减
String dataKey = "sku:lock:data:" + orderNo;
redisTemplate.opsForValue().set(dataKey, skuLockVoList);
return null;
}
操作模块:spzx-api-product
@FeignClient(
contextId = "remoteSkuStockService",
value = ServiceNameConstants.PRODUCT_SERVICE,
fallbackFactory = RemoteSkuStockFallbackFactory.class
)
public interface RemoteSkuStockService {
@PostMapping("/skuStock/checkAndLock/{orderNo}")
R<String> checkAndLock(
@PathVariable("orderNo") String orderNo,
@RequestBody List<SkuLockVo> skuLockVoList,
@RequestHeader(SecurityConstants.FROM_SOURCE) String source
);
}
package com.spzx.product.api.factory;
public class RemoteSkuStockFallbackFactory implements FallbackFactory<RemoteSkuStockService> {
@Override
public RemoteSkuStockService create(Throwable cause) {
return new RemoteSkuStockService() {
@Override
public R<String> checkAndLock(String orderNo, List<SkuLockVo> skuLockVoList, String source) {
return R.fail("检查与锁定商品失败:" + cause.getMessage());
}
};
}
}
org.springframework.boot.autoconfigure.AutoConfiguration.imports
com.spzx.product.api.fallback.RemoteSkuStockFallbackFactory
操作模块:spzx-order
@Autowired
private RemoteSkuStockService remoteSkuStockService;
@Transactional(rollbackFor = Exception.class)
@Override
public Long submitOrder(OrderForm orderForm) {
...
//3 校验库存并锁定库存
List<SkuLockVo> skuLockVoList = orderItemList.stream().map(item -> {
SkuLockVo skuLockVo = new SkuLockVo();
skuLockVo.setSkuId(item.getSkuId());
skuLockVo.setSkuNum(item.getSkuNum());
return skuLockVo;
}).collect(Collectors.toList());
String checkAndLockResult = remoteSkuStockService.checkAndLock(
orderForm.getTradeNo(),
skuLockVoList,
SecurityConstants.INNER).getData();
if(StringUtils.isNotEmpty(checkAndLockResult)) {
throw new ServiceException(checkAndLockResult);
}
...
return orderId;
}
如果下单失败,则接口会抛出异常,那么我们需要将已锁定的库存进行解锁。
操作模块:spzx-order
下单异常时需要解锁库存:
Long orderId = null;
try {
//下单
orderId = this.saveOrder(orderForm);
} catch (Exception e) {
e.printStackTrace();
//下单失败,解锁库存
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderForm.getTradeNo());
//抛出异常
throw new ServiceException("下单失败");
}
取消订单时需要解锁库存:
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId) {
OrderInfo orderInfo = baseMapper.selectById(orderId);
if(null != orderInfo && orderInfo.getOrderStatus().intValue() == 0) {
......
//发送MQ消息通知商品系统解锁库存:TODO
rabbitService.sendMessage(MqConst.EXCHANGE_PRODUCT, MqConst.ROUTING_UNLOCK, orderInfo.getOrderNo());
}
}
操作模块:spzx-product
添加依赖
<dependency>
<groupId>com.spzx</groupId>
<artifactId>spzx-common-rabbit</artifactId>
<version>3.6.3</version>
</dependency>
添加配置
spring:
rabbitmq:
host: 192.168.100.131
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED
publisher-returns: true
listener:
simple:
cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
package com.spzx.product.receiver;
@Slf4j
@Component
public class ProductReceiver {
@Autowired
private ISkuStockService skuStockService;
/**
* 解锁库存
* @param orderNo 订单号
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_PRODUCT, durable = "true"),
value = @Queue(value = MqConst.QUEUE_UNLOCK, durable = "true"),
key = {MqConst.ROUTING_UNLOCK}
))
public void unlock(String orderNo, Message message, Channel channel) {
//业务处理
log.info("[商品服务]监听解锁库存消息:{}", orderNo);
//解锁库存
skuStockService.unlock(orderNo);
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
void unlock(String orderNo);
@Transactional(rollbackFor = {Exception.class})
@Override
public void unlock(String orderNo) {
//幂等性处理
String key = "sku:unlock:" + orderNo;
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, orderNo, 1, TimeUnit.HOURS);
if(!isFirst) return;
// 获取锁定库存的缓存信息
String dataKey = "sku:lock:data:" + orderNo;
List<SkuLockVo> skuLockVoList = (List<SkuLockVo>)this.redisTemplate.opsForValue().get(dataKey);
if (CollectionUtils.isEmpty(skuLockVoList)){
return;
}
// 解锁库存
skuLockVoList.forEach(skuLockVo -> {
int row = baseMapper.unlock(skuLockVo.getSkuId(), skuLockVo.getSkuNum());
});
// 解锁库存之后,删除锁定库存的缓存。
this.redisTemplate.delete(dataKey);
}
Integer unlock(@Param("skuId") Long skuId, @Param("num")Integer num);
<update id="unlock">
update sku_stock
set lock_num = lock_num - #{num}, available_num = available_num + #{num}
where sku_id = #{skuId}
</update>