学习目标:
订单支付成功后,我们已经更改了订单支付记录状态,接下来我还有更改订单状态,因为他们是不同的微服务模块,所以我们采用消息队列的方式,保证数据最终一致性;
/**
* 订单支付
*/
public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";
public static final String ROUTING_PAYMENT_PAY = "payment.pay";
//队列
public static final String QUEUE_PAYMENT_PAY = "queue.payment.pay";
/**
* 减库存
*/
public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";
public static final String ROUTING_WARE_STOCK = "ware.stock";
//队列
public static final String QUEUE_WARE_STOCK = "queue.ware.stock";
/**
* 减库存成功,更新订单状态
*/
public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";
public static final String ROUTING_WARE_ORDER = "ware.order";
//队列
public static final String QUEUE_WARE_ORDER = "queue.ware.order";
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
service-payment
模块com.atguigu.gmall.payment.service.impl.AlipayServiceImpl#paySuccess方法
/**
* 修改本地交易记录状态
*
* @param paymentInfo
* @param paramsMap
*/
private void paySuccess(PaymentInfo paymentInfo, Map<String, String> paramsMap) {
//1.更新本地交易记录 跟 支付宝交易记录关联 后续进行日常对账
//1.1 支付宝支付交易编号凭据
String tradeNo = paramsMap.get("trade_no");
paymentInfo.setTradeNo(tradeNo);
paymentInfo.setCallbackTime(new Date());
paymentInfo.setCallbackContent(paramsMap.toString());
//2.更新本地交易记录状态
paymentInfo.setPaymentStatus(PaymentStatus.PAID.name());
paymentInfoService.updateById(paymentInfo);
//3.发送消息到MQ通知订单微服务修改订单状态
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_PAY, MqConst.ROUTING_PAYMENT_PAY, paymentInfo.getOrderId());
}
service-order
模块中创建OrderReceiver类添加方法
package com.atguigu.gmall.order.receiver;
import com.atguigu.gmall.common.rabbit.config.MqConst;
import com.atguigu.gmall.enums.model.OrderStatus;
import com.atguigu.gmall.enums.model.ProcessStatus;
import com.atguigu.gmall.order.model.OrderInfo;
import com.atguigu.gmall.order.service.OrderInfoService;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-05-09 14:19
*/
@Slf4j
@Component
public class OrderReceiver {
@Autowired
private OrderInfoService orderInfoService;
/**
* 监听用户支付成功消息,修改订单状态,发送锁定锁定库存消息到库存服务
*
* @param orderId 订单ID
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_PAYMENT_PAY, durable = "true"),
value = @Queue(value = MqConst.QUEUE_PAYMENT_PAY, durable = "true"),
key = MqConst.ROUTING_PAYMENT_PAY
))
public void processPaySuccessResult(Long orderId, Channel channel, Message message) {
try {
if (orderId != null) {
log.info("[订单服务]监听用户支付成功消息,订单ID:{}", orderId);
//1.根据订单ID查询订单记录
OrderInfo orderInfo = orderInfoService.getOrderInfoById(orderId);
if (orderInfo != null) {
//2.更新订单支付状态:已支付
orderInfoService.updateOrderStatus(orderId, ProcessStatus.PAID);
//3.todo 发送消息到第三方库存系统 完成锁定 商品库存
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//TODO 重新入队,让MQ服务器再次投递消息 做好最大次数限制,做好幂等性处理
log.error("[订单服务]监听用户支付成功消息,处理失败:{}", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
订单模块除了接收到请求改变单据状态,还要发送库存系统
查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。
/**
* 发送消息到MQ通知第三方库存系统锁定库存
* @param orderId
*/
void sendDeductLockMsg(Long orderId);
/**
* 构建第三方库存系统要求参数(包含订单信息,订单明细信息)
*/
Map initWareMap(OrderInfo orderInfo);
/**
* 发送消息到MQ通知第三方库存系统锁定库存
*
* @param orderId
*/
@Override
public void sendDeductLockMsg(Long orderId) {
//1.根据订单ID获取订单信息(包含订单明细)
OrderInfo orderInfo = this.getOrderInfoById(orderId);
if (orderInfo != null) {
//2.按照第三方库存系统要求接口参数封装
Map map = this.initWareMap(orderInfo);
//3.发送消息到MQ完成锁定库存
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_WARE_STOCK, MqConst.ROUTING_WARE_STOCK, JSON.toJSONString(map));
}
}
/**
* 构建第三方库存系统要求参数(包含订单信息,订单明细信息)
*/
@Override
public Map initWareMap(OrderInfo orderInfo) {
//1.构建结果Map
HashMap<String, Object> map = new HashMap<>();
//1.1 封装订单信息
map.put("orderId", orderInfo.getId());
map.put("consignee", orderInfo.getConsignee());
map.put("consigneeTel", orderInfo.getConsigneeTel());
map.put("orderComment", orderInfo.getOrderComment());
map.put("orderBody", orderInfo.getTradeBody());
map.put("deliveryAddress", orderInfo.getDeliveryAddress());
//todo 库存系统保存订单记录 支付方式 ‘1’ 为在线支付,‘2’为货到付款。
map.put("paymentWay", "1");
//1.2 封装订单明细
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
if (!CollectionUtils.isEmpty(orderDetailList)) {
List<HashMap<String, Object>> details = orderDetailList.stream().map(orderDetail -> {
HashMap<String, Object> detailMap = new HashMap<>();
detailMap.put("skuId", orderDetail.getSkuId());
detailMap.put("skuNum", orderDetail.getSkuNum());
detailMap.put("skuName", orderDetail.getSkuName());
return detailMap;
}).collect(Collectors.toList());
map.put("details", details);
}
return map;
}
给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。
同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。
接受到消息后主要做的工作就是更新订单状态。
在订单项目中OrderReceiver
/**
* 监听库存系统 锁定商品库存结果
*
* @param deductResultJsonStr "{orderId:101,status:'DEDUCTED|OUT_OF_STOCK'}"
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_WARE_ORDER, durable = "true"),
value = @Queue(value = MqConst.QUEUE_WARE_ORDER, durable = "true"),
key = MqConst.ROUTING_WARE_ORDER
))
public void processDeductResult(String deductResultJsonStr, Channel channel, Message message) {
try {
if (StringUtils.isNotBlank(deductResultJsonStr)) {
log.info("[订单服务]监听锁定库存结果:{}", deductResultJsonStr);
//1.根据锁定库存结果 更新订单状态
Map<String, String> map = JSON.parseObject(deductResultJsonStr, Map.class);
String orderId = map.get("orderId");
String status = map.get("status");
if (StringUtils.isNotBlank(orderId) && StringUtils.isNotBlank(status)) {
if ("DEDUCTED".equals(status)) {
//将订单处理状态修改为"待发货"
orderInfoService.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.WAITING_DELEVER);
}
if ("OUT_OF_STOCK".equals(status)) {
//将订单处理状态修改为"库存异常"
orderInfoService.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.STOCK_EXCEPTION);
}
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("[订单服务]监听锁定库存结果异常:{},消息内容:{}", e, deductResultJsonStr);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
YAPI接口地址:http://192.168.200.128:3000/project/11/interface/api/747
service-order
订单模块OrderApiController
处理拆单请求
/**
* 提供给库存系统进行拆单业务接口
*
* @param orderId 订单ID
* @param wareSkuMap 库存系统查询到仓库ID跟商品对照关系 "[{"wareId":"1","skuIds":["41"]},{"wareId":"2","skuIds":["26"]}]"
* @return 响应拆单后结果-包含每个订单对应出货仓库ID
*/
@PostMapping("/orderSplit")
public String orderSplit(@RequestParam("orderId") String orderId, @RequestParam("wareSkuMap") String wareSkuMap) {
return orderInfoService.orderSplit(orderId, wareSkuMap);
}
OrderInfoService
/**
* 提供给库存系统进行拆单业务接口
*
* @param orderId 订单ID
* @param wareSkuMap 库存系统查询到仓库ID跟商品对照关系 "[{"wareId":"1","skuIds":["41"]},{"wareId":"2","skuIds":["26"]}]"
* @return 响应拆单后结果-包含每个订单对应出货仓库ID
*/
String orderSplit(String orderId, String wareSkuMap);
OrderInfoServiceImpl
/**
* 提供给库存系统进行拆单业务接口
* 本质: 将原始订单以及订单明细 拆分为多子订单(新增) 跟 子订单明细(新增)
*
* @param orderId 原始订单ID
* @param wareSkuMapStr 库存系统查询到仓库ID跟商品对照关系 "[{"wareId":"1","skuIds":["41"]},{"wareId":"2","skuIds":["26"]}]"
* @return 响应拆单后结果-包含每个订单对应出货仓库ID
*/
@Override
public String orderSplit(String orderId, String wareSkuMapStr) {
//1.根据原始订单ID 查询原始订单信息以及原始订单明细信息
if (StringUtils.isNotBlank(orderId) && StringUtils.isNotBlank(wareSkuMapStr)) {
OrderInfo originOrderInfo = this.getOrderInfoById(Long.valueOf(orderId));
if (originOrderInfo != null && !OrderStatus.SPLIT.name().equals(originOrderInfo.getOrderStatus())) {
//原始订单订单明细
List<OrderDetail> originOrderDetailList = originOrderInfo.getOrderDetailList();
//2.将仓库ID 跟 商品ID 对照JSON字符串转为List集合-类型Map
List<Map> allWareSkuMapList = JSON.parseArray(wareSkuMapStr, Map.class);
if (!CollectionUtils.isEmpty(allWareSkuMapList)) {
//3.遍历集合每遍历一次产生子订单 得到子订单订单明细 将子订单跟订单明细新增
List<OrderInfo> allSubOrderInfoList = allWareSkuMapList.stream().map(wareSkuMap -> {
//3.1 新建子订单对象 大多数属性值来源于原始订单
OrderInfo subOrderInfo = new OrderInfo();
BeanUtils.copyProperties(originOrderInfo, subOrderInfo);
subOrderInfo.setId(null);//避免分布式ID出现主键冲突
subOrderInfo.setParentOrderId(originOrderInfo.getId()); //子订单关联原始订单
subOrderInfo.setUpdateTime(new Date());
//3.2 判断得到新子订单中订单明细集合
//3.2.1 获取到当前子订单中商品ID集合
List<String> skuIdStrList = (List<String>) wareSkuMap.get("skuIds");
//3.2.2 遍历原始订单明细集合将当前子订单中订单明细过滤出来
List<OrderDetail> newSubOrderDetailList = originOrderDetailList.stream().filter(originOrderDetail -> {
return skuIdStrList.contains(originOrderDetail.getSkuId().toString());
}).collect(Collectors.toList());
subOrderInfo.setOrderDetailList(newSubOrderDetailList);
subOrderInfo.sumTotalAmount();
//3.2.3 为新的子订单设置对应出货仓库ID
String wareId = (String) wareSkuMap.get("wareId");
subOrderInfo.setWareId(wareId);
//3.3 保存新子订单
this.save(subOrderInfo);
//3.4 todo 保存新订单订单明细
newSubOrderDetailList.stream().forEach(newSubOrderDetail -> {
//将新增子订单关联到订单明细
newSubOrderDetail.setOrderId(subOrderInfo.getId());
});
orderDetailService.saveBatch(newSubOrderDetailList);
return subOrderInfo;
}).collect(Collectors.toList());
//3.3.1 将原始订单状态改为 已拆单
originOrderInfo.setOrderStatus(OrderStatus.SPLIT.name());
this.updateById(originOrderInfo);
//4.按照接口文档响应结果给库存系统 TODO:保证每个订单包含对应出货仓库ID
List<Map> orderWareMap = allSubOrderInfoList.stream().map(subOrderInfo -> {
return this.initWareMap(subOrderInfo);
}).collect(Collectors.toList());
return JSON.toJSONString(orderWareMap);
}
}
}
return null;
}
订单持久层Mapper映射文件中,增加过滤订单条件,将拆分原始订单排除掉 OrderInfoMapper.xml
<!--查询用户订单列表-->
<select id="getOrderList" resultMap="orderInfoMap">
select
oi.id,
oi.consignee,
oi.consignee_tel,
oi.total_amount,
oi.order_status,
oi.user_id,
oi.payment_way,
oi.delivery_address,
oi.order_comment,
oi.out_trade_no,
oi.trade_body,
oi.create_time,
od.id order_deatil_id,
od.sku_id,
od.sku_name,
od.img_url,
od.order_price,
od.sku_num
from order_info oi inner join order_detail od on od.order_id = oi.id
where oi.user_id = #{userId} and oi.order_status != 'SPLIT'
order by oi.create_time
</select>
/**
* 关闭交易
*/
public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";
public static final String ROUTING_PAYMENT_CLOSE = "payment.close";
//队列
public static final String QUEUE_PAYMENT_CLOSE = "queue.payment.close";
service-order
中修改com.atguigu.gmall.order.service.impl.OrderInfoServiceImpl#execExpiredOrder方法
/**
* 将订单关闭
*
* @param orderId
*/
@Override
public void execExpiredOrder(Long orderId) {
//1.关闭订单状态
this.updateOrderStatus(orderId, ProcessStatus.CLOSED);
//2.todo 新增 发送消息通知支付系统关闭交易记录
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, MqConst.ROUTING_PAYMENT_CLOSE, orderId);
}
package com.atguigu.gmall.payment.receiver;
import com.atguigu.gmall.common.rabbit.config.MqConst;
import com.atguigu.gmall.enums.model.PaymentStatus;
import com.atguigu.gmall.enums.model.PaymentType;
import com.atguigu.gmall.payment.model.PaymentInfo;
import com.atguigu.gmall.payment.service.AlipayService;
import com.atguigu.gmall.payment.service.PaymentInfoService;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-05-12 15:13
*/
@Slf4j
@Component
public class PaymentReceiver {
@Autowired
private PaymentInfoService paymentInfoService;
@Autowired
private AlipayService alipayService;
/**
* 超时未支付订单,监听关闭本地交易消息
*
* @param orderId
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, durable = "true"),
value = @Queue(value = MqConst.QUEUE_PAYMENT_CLOSE, durable = "true"),
key = MqConst.ROUTING_PAYMENT_CLOSE
))
public void procssPayClose(Long orderId, Channel channel, Message message) {
try {
if (orderId != null) {
log.info("[支付服务]监听到关闭交易记录消息:{}", orderId);
//1.根据订单ID查询本地交易记录
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfoByOrderId(orderId, PaymentType.ALIPAY.name());
//2.判断本地交易记录状态 如果是:未支付 将状态改为关闭
if (paymentInfo != null && PaymentStatus.UNPAID.name().equals(paymentInfo.getPaymentStatus())) {
paymentInfo.setPaymentStatus(PaymentStatus.CLOSED.name());
paymentInfoService.updateById(paymentInfo);
//3.查询支付宝端交易状态 如果是:等待支付 关闭支付宝交易记录
String status = alipayService.getZfbTradeState(paymentInfo.getTradeNo(), paymentInfo.getOutTradeNo());
if ("WAIT_BUYER_PAY".equals(status)) {
alipayService.closeZfbTrade(paymentInfo.getTradeNo(), paymentInfo.getOutTradeNo());
}
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("支付服务]监听到关闭交易记录处理异常:消息{}", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
PaymentInfoService
/**
* 根据订单编号+支付方式 查询本地交易记录
* @param orderId
* @param paymentType
* @return
*/
PaymentInfo getPaymentInfoByOrderId(Long orderId, String paymentType);
PaymentInfoServiceImpl
/**
* 根据订单ID+支付方式查询本地交易记录
* @param orerId
* @param payType
* @return
*/
@Override
public PaymentInfo getPaymentInfoByOrderId(Long orderId, String paymentType) {
LambdaQueryWrapper<PaymentInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(PaymentInfo::getOrderId, orderId);
queryWrapper.eq(PaymentInfo::getPaymentType, paymentType);
PaymentInfo paymentInfo = this.getOne(queryWrapper);
return paymentInfo;
}
AlipayService接口
/**
* 根据支付宝交易号,关闭支付宝交易(支付二维码失效)
*
* @param tradeNo
* @param outTradeNo
*/
void closeZfbTrade(String tradeNo, String outTradeNo);
AlipayServiceImpl
/**
* 根据支付宝交易号,关闭支付宝交易(支付二维码失效)
*
* @param tradeNo
* @param outTradeNo
*/
@Override
public void closeZfbTrade(String tradeNo, String outTradeNo) {
try {
AlipayTradeCloseRequest request = new AlipayTradeCloseRequest();
JSONObject bizContent = new JSONObject();
if (StringUtils.isNotBlank(tradeNo)) {
bizContent.put("trade_no", tradeNo);
}
if (StringUtils.isNotBlank(outTradeNo)) {
bizContent.put("out_trade_no", outTradeNo);
}
request.setBizContent(bizContent.toString());
alipayClient.execute(request);
} catch (Exception e) {
log.error("[支付服务],查询支付宝交易失败:{}", e);
}
}
AlipayService
/**
* 根据支付宝交易号,查询支付宝交易状态
* @param tradeNo
* @return
*/
String getZfbTradeState(String tradeNo, String outTradeNo);
AlipayServiceImpl
/**
* 根据支付宝交易号,查询支付宝交易状态
*
* @param tradeNo
* @return
*/
@Override
public String getZfbTradeState(String tradeNo, String outTradeNo) {
try {
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();
JSONObject bizContent = new JSONObject();
if (StringUtils.isNotBlank(tradeNo)) {
bizContent.put("trade_no", tradeNo);
}
if (StringUtils.isNotBlank(outTradeNo)) {
bizContent.put("out_trade_no", outTradeNo);
}
request.setBizContent(bizContent.toString());
AlipayTradeQueryResponse response = alipayClient.execute(request);
if (response.isSuccess()) {
String tradeStatus = response.getTradeStatus();
return tradeStatus;
}
return null;
} catch (AlipayApiException e) {
log.error("[支付服务],查询支付宝交易失败:{}", e);
throw new RuntimeException(e);
}
}
service-payment
支付模块监听器中处理
/**
* 超时未支付订单,监听关闭本地交易消息
*
* @param orderId
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, durable = "true"),
value = @Queue(value = MqConst.QUEUE_PAYMENT_CLOSE, durable = "true"),
key = MqConst.ROUTING_PAYMENT_CLOSE
))
public void procssPayClose(Long orderId, Channel channel, Message message) {
try {
if (orderId != null) {
log.info("[支付服务]监听到关闭交易记录消息:{}", orderId);
//1.根据订单ID查询本地交易记录
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfoByOrderId(orderId, PaymentType.ALIPAY.name());
//2.判断本地交易记录状态 如果是:未支付 将状态改为关闭
if (paymentInfo != null && PaymentStatus.UNPAID.name().equals(paymentInfo.getPaymentStatus())) {
paymentInfo.setPaymentStatus(PaymentStatus.CLOSED.name());
paymentInfoService.updateById(paymentInfo);
//3.查询支付宝端交易状态 如果是:等待支付 关闭支付宝交易记录
String status = alipayService.getZfbTradeState(paymentInfo.getTradeNo(), paymentInfo.getOutTradeNo());
if ("WAIT_BUYER_PAY".equals(status)) {
alipayService.closeZfbTrade(paymentInfo.getTradeNo(), paymentInfo.getOutTradeNo());
}
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("支付服务]监听到关闭交易记录处理异常:消息{}", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}