学习目标:
订单支付成功后,我们已经更改了订单支付记录状态,接下来我还有更改订单状态,因为他们是不同的微服务模块,所以我们采用消息队列的方式,保证数据最终一致性;
/**
* 订单支付
*/
public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";
public static final String ROUTING_PAYMENT_PAY = "payment.pay";
//队列
public static final String QUEUE_PAYMENT_PAY = "queue.payment.pay";
/**
* 减库存
*/
public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";
public static final String ROUTING_WARE_STOCK = "ware.stock";
//队列
public static final String QUEUE_WARE_STOCK = "queue.ware.stock";
/**
* 减库存成功,更新订单状态
*/
public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";
public static final String ROUTING_WARE_ORDER = "ware.order";
//队列
public static final String QUEUE_WARE_ORDER = "queue.ware.order";
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
service-payment
模块com.atguigu.gmall.payment.service.impl.PaymentInfoServiceImpl#paySuccess方法
/**
* 更新本地交易记录表
*
* @param paymentInfo
* @param paramsMap
*/
@Override
public void paySuccess(PaymentInfo paymentInfo, Map<String, String> paramsMap) {
//1.设置本地交易记录表中字段值
paymentInfo.setTradeNo(paramsMap.get("trade_no"));
paymentInfo.setPaymentStatus(PaymentStatus.PAID.name());
paymentInfo.setCallbackTime(new Date());
paymentInfo.setCallbackContent(paramsMap.toString());
this.updateById(paymentInfo);
//2.发送消息到MQ 异步通知订单微服务修改订单支付状态
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_PAY, MqConst.ROUTING_PAYMENT_PAY, paymentInfo.getOrderId());
}
service-order
模块中创建OrderReceiver类添加方法
/**
* 监听支付宝支付成功消息
*
* @param orderId 订单ID
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MqConst.EXCHANGE_DIRECT_PAYMENT_PAY),
value = @Queue(value = MqConst.QUEUE_PAYMENT_PAY, durable = "true"),
key = MqConst.ROUTING_PAYMENT_PAY
))
public void processPaySuccessMsg(Long orderId, Message message, Channel channel) {
try {
if (orderId != null) {
//处理业务
log.info("[订单服务]监听支付成功消息:{}", orderId);
OrderInfo orderInfo = orderInfoService.getById(orderId);
if (orderInfo != null && !ProcessStatus.PAID.name().equals(orderInfo.getOrderStatus())) {
//1.监听到支付成功消息,修改订单状态为已支付
orderInfoService.updateOrderStatus(orderId, ProcessStatus.PAID);
//2.发送消息到MQ目的通知库存系统库存锁定/扣减
orderInfoService.sendDeductStockMsg(orderInfo.getOutTradeNo());
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
log.error("[订单服务]处理支付成功消息异常:{}", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
订单模块除了接收到请求改变单据状态,还要发送库存系统
查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。
/**
* 构建扣减库存消息,通知库存系统库存锁定/扣减
* @param outTradeNo
*/
void sendDeductStockMsg(String outTradeNo);
/**
* 将订单订单明细转为Map 后续该方法被复用
* @param orderInfo
* @return
*/
Map initWareMap(OrderInfo orderInfo);
/**
* 构建扣减库存消息,通知库存系统库存锁定/扣减
*
* @param outTradeNo
*/
@Override
public void sendDeductStockMsg(String outTradeNo) {
//1.根据订单编号/订单ID查询订单,跟订单明细
OrderInfo orderInfo = this.getOrderInfoByOutradeNo(outTradeNo);
//2.将得到订单信息封装为库存系统所需Map对象
if (orderInfo != null) {
Map<String, Object> stockMap = initWareMap(orderInfo);
//3.发送扣减库存消息到MQ,通知库存系统进行商品库存锁定
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_WARE_STOCK, MqConst.ROUTING_WARE_STOCK, JSON.toJSONString(stockMap));
}
}
/**
* 将订单信息封装为库存系统所需要Map集合对象
*
* @param orderInfo
* @return
*/
private Map<String, Object> initWareMap(OrderInfo orderInfo) {
HashMap<String, Object> mapResult = new HashMap<>();
//1.封装Map中订单信息
mapResult.put("orderId", orderInfo.getId());
mapResult.put("consignee", orderInfo.getConsignee());
mapResult.put("consigneeTel", orderInfo.getConsigneeTel());
mapResult.put("orderComment", orderInfo.getOrderComment());
mapResult.put("orderBody", orderInfo.getTradeBody());
mapResult.put("deliveryAddress", orderInfo.getDeliveryAddress());
//orderInfo.getPaymentWay()=====ALIPAY 库存系统1:在线支付
mapResult.put("paymentWay", "1");
//2.封装Map中订单详情信息
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
if (!CollectionUtils.isEmpty(orderDetailList)) {
List<Map<String, Object>> details = orderDetailList.stream().map(orderDetail -> {
Map<String, Object> detailMap = new HashMap<>();
detailMap.put("skuId", orderDetail.getSkuId());
detailMap.put("skuNum", orderDetail.getSkuNum());
detailMap.put("skuName", orderDetail.getSkuName());
return detailMap;
}).collect(Collectors.toList());
mapResult.put("details", details);
}
return mapResult;
}
给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。
同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。
接受到消息后主要做的工作就是更新订单状态。
在订单项目中OrderReceiver
/**
* 监听库存系统锁定商品库存结果
*
* @param deductStr {"orderId":"64","status":"OUT_OF_STOCK"} 或者 {"orderId":"63","status":"DEDUCTED"}
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MqConst.EXCHANGE_DIRECT_WARE_ORDER),
value = @Queue(value = MqConst.QUEUE_WARE_ORDER, durable = "true"),
key = MqConst.ROUTING_WARE_ORDER
))
public void processDeductStockResult(String deductStr, Message message, Channel channel) {
try {
if (StringUtils.isNotBlank(deductStr)) {
log.info("[订单服务],监听库存系统扣减结果:{}", deductStr);
//根据消息中扣减状态 修改指定订单状态
Map map = JSON.parseObject(deductStr, Map.class);
String orderId = (String) map.get("orderId");
String status = (String) map.get("status");
if (StringUtils.isNotBlank(orderId) && StringUtils.isNotBlank(status)) {
if ("DEDUCTED".equals(status)) {
orderInfoService.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.WAITING_DELEVER);
}
if ("OUT_OF_STOCK".equals(status)) {
orderInfoService.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.STOCK_EXCEPTION);
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
YAPI接口地址:http://192.168.200.128:3000/project/11/interface/api/747
application-dev.yml
order:
split:
url: http://localhost:8204/api/order/orderSplit
service-order
订单模块OrderController
处理拆单请求
/**
* 提供给库存系统调用订单拆单接口
*
* @param orderId 订单ID
* @param wareSkuMap [{"wareId":"1","skuIds":["28","29"]},{"wareId":"2","skuIds":["24"]}]
* @return
*/
@PostMapping("/orderSplit")
public String orderSplit(String orderId, String wareSkuMap) {
return orderInfoService.orderSplit(orderId, wareSkuMap);
}
OrderInfoService
/**
* 提供给库存系统调用订单拆单接口
*
* @param orderId 订单ID
* @param wareSkuMap [{"wareId":"1","skuIds":["28","29"]},{"wareId":"2","skuIds":["24"]}]
* @return
*/
String orderSplit(String orderId, String wareSkuMap);
/**
* 根据订单ID查询订单以及订单明细
* @param orderId
* @return
*/
OrderInfo getOrderInfo(Long orderId);
OrderInfoServiceImpl
/**
* 拆分订单处理
* 1.根据提交订单ID查询原始的订单以及订单明细
* 2.遍历仓库跟商品SKU对应关系List 构建新的子订单 订单明细 进行保存
* 3.更新原始订单状态:SPLIT
* 4.按照接口文档构建响应结果[{orderId:1,wareId:"出货仓库ID",orderBody:"",details:[{},{}]},{}]
*
* @param map
* @return
*/
@Override
public String orderSplit(Long orderId, String wareSkuMapStr) {
List<OrderInfo> allSubOrderInfoList = new ArrayList<>();
//1.根据订单ID查询原始订单以及订单明细
OrderInfo orderInfoOrigin = this.getOrderInfo(orderId);
//原始订单中所有订单商品明细
List<OrderDetail> orderDetailOriginList = orderInfoOrigin.getOrderDetailList();
//2.根据提交仓库跟SKU对应关系,构建新子订单以及订单明细并保存
//将得到的字符串转为List集合
List<Map> wareSkuMap = JSON.parseArray(wareSkuMapStr, Map.class);
if (!CollectionUtils.isEmpty(wareSkuMap)) {
//2.1 遍历仓库商品SKU对照集合 对应产生几个子订单
wareSkuMap.stream().forEach(skuWareMap -> {
//获取仓库ID
String wareId = (String) skuWareMap.get("wareId");
//获取仓库包含SKUID集合
List<String> skuIds = (List<String>) skuWareMap.get("skuIds");
//使用Stream流式变成进行对集合数据过滤
List<OrderDetail> subOrderDetailList = orderDetailOriginList.stream().filter(orderDetail -> {
return skuIds.contains(orderDetail.getSkuId().toString());
}).collect(Collectors.toList());
//2.2 构建新子订单,设置上级订单;设置订单仓库ID;设置订单ID 执行保存
OrderInfo subOrderInfo = new OrderInfo();
BeanUtils.copyProperties(orderInfoOrigin, subOrderInfo);
subOrderInfo.setId(null); //避免主键冲突
subOrderInfo.setParentOrderId(orderInfoOrigin.getId());//上级订单ID
subOrderInfo.setWareId(wareId);// 订单对应出库仓库ID
//重新计算新子订单总金额
subOrderInfo.setOrderDetailList(subOrderDetailList);
//计算总金额
subOrderInfo.sumTotalAmount();
//执行保存子订单
this.save(subOrderInfo);
//2.3 构建新子订单的订单明细,设置所属订单新子订单 执行保存
//保存子订单项
for (OrderDetail orderDetail : subOrderDetailList) {
orderDetail.setOrderId(subOrderInfo.getId());
}
orderDetailService.saveBatch(subOrderDetailList);
allSubOrderInfoList.add(subOrderInfo);
});
}
//3.更新原始订单状态
this.updateOrderStatus(orderInfoOrigin.getId(), ProcessStatus.SPLIT);
//4.按照接口文档构建响应结果"[{orderId:1,wareId:"出货仓库ID",orderBody:"",details:[{},{}]},{}]"
List<Map> collect = allSubOrderInfoList.stream().map(orderInfo -> {
Map map = initWareOrder(orderInfo);
return map;
}).collect(Collectors.toList());
return JSON.toJSONString(collect);
}
/**
* 根据订单ID查询订单以及订单明细
*
* @param orderId
* @return
*/
@Override
public OrderInfo getOrderInfo(Long orderId) {
//1.根据订单ID查询订单信息
OrderInfo orderInfo = this.getById(orderId);
//2.根据订单ID查询订单明细列表
if (orderInfo != null) {
LambdaQueryWrapper<OrderDetail> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(OrderDetail::getOrderId, orderId);
List<OrderDetail> orderDetailList = orderDetailService.list(queryWrapper);
orderInfo.setOrderDetailList(orderDetailList);
}
return orderInfo;
}
修改initWareOrder方法 必须设置wareId参数提交到库存系统告知订单发货仓库ID
/**
* 将订单信息封装为库存系统所需要Map集合对象
*
* @param orderInfo
* @return
*/
private Map<String, Object> initWareMap(OrderInfo orderInfo) {
HashMap<String, Object> mapResult = new HashMap<>();
//1.封装Map中订单信息
mapResult.put("orderId", orderInfo.getId());
mapResult.put("consignee", orderInfo.getConsignee());
mapResult.put("consigneeTel", orderInfo.getConsigneeTel());
mapResult.put("orderComment", orderInfo.getOrderComment());
mapResult.put("orderBody", orderInfo.getTradeBody());
mapResult.put("deliveryAddress", orderInfo.getDeliveryAddress());
//orderInfo.getPaymentWay()=====ALIPAY 库存系统1:在线支付
mapResult.put("paymentWay", "1");
//TODO新增,指定当前订单出货仓库ID
mapResult.put("wareId", orderInfo.getWareId());
//2.封装Map中订单详情信息
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
if (!CollectionUtils.isEmpty(orderDetailList)) {
List<Map<String, Object>> details = orderDetailList.stream().map(orderDetail -> {
Map<String, Object> detailMap = new HashMap<>();
detailMap.put("skuId", orderDetail.getSkuId());
detailMap.put("skuNum", orderDetail.getSkuNum());
detailMap.put("skuName", orderDetail.getSkuName());
return detailMap;
}).collect(Collectors.toList());
mapResult.put("details", details);
}
return mapResult;
}
/**
* 关闭交易
*/
public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";
public static final String ROUTING_PAYMENT_CLOSE = "payment.close";
//队列
public static final String QUEUE_PAYMENT_CLOSE = "queue.payment.close";
service-order
中修改PaymentReceiver监听器
/**
* 监听关闭订单消息:将订单关闭;todo 关闭本地交易记录,支付宝交易记录
*
* @param orderId 订单ID
* @param message
* @param channel
*/
@RabbitListener(queues = {MqConst.QUEUE_ORDER_CANCEL})
public void closeOrder(Long orderId, Message message, Channel channel) {
try {
//1.处理业务
if (orderId != null) {
log.info("【订单微服务】关闭订单消息:{}", orderId);
//1.1 根据订单ID查询订单状态 状态如果是未支付:将订单状态改为关闭
OrderInfo orderInfo = orderInfoService.getById(orderId);
if (orderId != null && OrderStatus.UNPAID.name().equals(orderInfo.getOrderStatus()) && OrderStatus.UNPAID.name().equals(orderInfo.getProcessStatus())) {
//1.2 修改订单状态-为关闭
orderInfoService.execExpiredOrder(orderId);
//1.3 发送关闭订单消息-关闭本地交易记录
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, MqConst.ROUTING_PAYMENT_CLOSE, orderId);
}
}
//2.手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
log.error("【订单微服务】关闭订单业务异常:{}", e);
}
}
package com.atguigu.gmall.payment.receiver;
import com.atguigu.gmall.common.constant.MqConst;
import com.atguigu.gmall.enums.model.PaymentStatus;
import com.atguigu.gmall.enums.model.PaymentType;
import com.atguigu.gmall.payment.model.PaymentInfo;
import com.atguigu.gmall.payment.service.PaymentInfoService;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: atguigu
* @create: 2023-03-15 11:47
*/
@Slf4j
@Component
public class PaymentReceiver {
@Autowired
private PaymentInfoService paymentInfoService;
/**
* 监听关闭订单消息,将本地交易记录进行关闭
*
* @param orderId
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE),
value = @Queue(value = MqConst.QUEUE_PAYMENT_CLOSE, durable = "true"),
key = {MqConst.ROUTING_PAYMENT_CLOSE}
))
public void processCloseOrderMsg(Long orderId, Message message, Channel channel) {
try {
if (orderId != null) {
log.info("[支付系统]监听关闭本地交易记录消息:{}", orderId);
//修改本地交易记录状态
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(orderId, PaymentType.ALIPAY.name());
if (paymentInfo != null && PaymentStatus.UNPAID.name().equals(paymentInfo.getPaymentStatus())) {
//将交易状态改为关闭
paymentInfo.setPaymentStatus(PaymentStatus.CLOSED.name());
paymentInfoService.updateById(paymentInfo);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
log.error("[支付系统]监听关闭本地交易记录异常:", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
PaymentInfoService
/**
* 根据订单ID+付款方式查询本地交易记录
* @param orerId
* @param payType
* @return
*/
PaymentInfo getPaymentInfo(Long orerId, String payType);
PaymentInfoServiceImpl
/**
* 根据订单ID+支付方式查询本地交易记录
* @param orerId
* @param payType
* @return
*/
@Override
public PaymentInfo getPaymentInfo(Long orerId, String payType) {
LambdaQueryWrapper<PaymentInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(PaymentInfo::getOrderId, orerId);
queryWrapper.eq(PaymentInfo::getPaymentType, payType);
return this.getOne(queryWrapper);
}
AlipayService接口
/***
* 关闭支付宝交易
* @param orderId
* @return
*/
void closeAliPay(Long orderId);
AlipayServiceImpl
/**
* 关闭支付宝交易
*
* @param orderId
* @return
*/
@Override
public void closeAliPay(Long orderId) {
try {
//1.根据订单ID+支付方式查询本地交易记录
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(orderId, PaymentType.ALIPAY.name());
//2.调用支付宝接口关闭交易
if (paymentInfo != null) {
AlipayTradeCloseRequest request = new AlipayTradeCloseRequest();
JSONObject bizContent = new JSONObject();
bizContent.put("out_trade_no", paymentInfo.getOutTradeNo());
request.setBizContent(bizContent.toString());
AlipayTradeCloseResponse response = alipayClient.execute(request);
if (response.isSuccess()) {
log.info("关闭支付宝订单成功:{}", orderId);
} else {
log.error("关闭支付宝订单失败:{}", orderId);
}
}
} catch (AlipayApiException e) {
e.printStackTrace();
log.error("关闭支付宝订单失败:订单ID:{},异常信息:{}", orderId, e);
}
}
AlipayService
/**
* 根据订单查询交易是否可以支付
* @param orderId
* @return
*/
Boolean checkPaymentSatus(Long orderId);
AlipayServiceImpl
/**
* 查询支付宝交易订单状态
*
* @param orderId
* @return true:说明该订单等待支付
*/
@Override
public Boolean checkPaymentSatus(Long orderId) {
try {
//1.根据订单ID+支付方式查询本地交易记录
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(orderId, PaymentType.ALIPAY.name());
if (paymentInfo != null) {
//2.调用支付宝接口查询交易状态
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();
JSONObject bizContent = new JSONObject();
bizContent.put("out_trade_no", paymentInfo.getOutTradeNo());
request.setBizContent(bizContent.toString());
AlipayTradeQueryResponse response = alipayClient.execute(request);
if (response.isSuccess()) {
String tradeStatus = response.getTradeStatus();
if ("WAIT_BUYER_PAY".equals(tradeStatus)) {
return true;
}
}
}
} catch (AlipayApiException e) {
e.printStackTrace();
}
return false;
}
service-payment
支付模块监听器中处理
package com.atguigu.gmall.payment.receiver;
import com.atguigu.gmall.common.constant.MqConst;
import com.atguigu.gmall.enums.model.PaymentStatus;
import com.atguigu.gmall.enums.model.PaymentType;
import com.atguigu.gmall.payment.model.PaymentInfo;
import com.atguigu.gmall.payment.service.AlipayService;
import com.atguigu.gmall.payment.service.PaymentInfoService;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: atguigu
* @create: 2023-03-15 11:47
*/
@Slf4j
@Component
public class PaymentReceiver {
@Autowired
private PaymentInfoService paymentInfoService;
@Autowired
private AlipayService alipayService;
/**
* 监听关闭订单消息,将本地交易记录进行关闭
*
* @param orderId
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE),
value = @Queue(value = MqConst.QUEUE_PAYMENT_CLOSE, durable = "true"),
key = {MqConst.ROUTING_PAYMENT_CLOSE}
))
public void processCloseOrderMsg(Long orderId, Message message, Channel channel) {
try {
if (orderId != null) {
log.info("[支付系统]监听关闭本地交易记录消息:{}", orderId);
//修改本地交易记录状态
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(orderId, PaymentType.ALIPAY.name());
if (paymentInfo != null && PaymentStatus.UNPAID.name().equals(paymentInfo.getPaymentStatus())) {
//将本地交易状态改为关闭
paymentInfo.setPaymentStatus(PaymentStatus.CLOSED.name());
paymentInfoService.updateById(paymentInfo);
//查询支付宝交易记录状态
Boolean flag = alipayService.checkPaymentSatus(orderId);
if (flag) {
//关闭支付宝交易
alipayService.closeAliPay(orderId);
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
log.error("[支付系统]监听关闭本地交易记录异常:", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}