学习目标:
订单支付成功后,我们已经更改了订单支付记录状态,接下来我还有更改订单状态,因为他们是不同的微服务模块,所以我们采用消息队列的方式,保证数据最终一致性;
/**
* 订单支付
*/
public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";
public static final String ROUTING_PAYMENT_PAY = "payment.pay";
//队列
public static final String QUEUE_PAYMENT_PAY = "queue.payment.pay";
/**
* 减库存
*/
public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";
public static final String ROUTING_WARE_STOCK = "ware.stock";
//队列
public static final String QUEUE_WARE_STOCK = "queue.ware.stock";
/**
* 减库存成功,更新订单状态
*/
public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";
public static final String ROUTING_WARE_ORDER = "ware.order";
//队列
public static final String QUEUE_WARE_ORDER = "queue.ware.order";
<dependency>
<groupId>com.atguigu.gmall</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
service-payment
模块com.atguigu.gmall.payment.service.impl.AlipayServiceImpl#paySuccessNotify方法
/**
* 当用户支付成功后,支付宝会自动调用该接口进行支付结果通知
*
* @param pramsMap
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public String notifyPaySuccess(Map<String, String> paramsMap) {
log.info(paramsMap.toString());
//1.进行幂等性处理 set nx
String notifyId = (String) paramsMap.get("notify_id");
String key = "alipay:notify:" + notifyId;
try {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, notifyId, 25, TimeUnit.HOURS);
if (!flag) {
return "success";
}
//2.对提交参数进行验签,防止恶意“虚假通知”以及数据在网络传输中被篡改 //调用SDK验证签名
boolean signVerified = AlipaySignature.rsaCheckV1(paramsMap, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type);
if (!signVerified) {
log.error("[支付服务]支付宝验签失败:{}", paramsMap.toString());
return "failure";
}
//3.验证订单是否属于商家
String outTradeNo = paramsMap.get("out_trade_no");
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(outTradeNo, PaymentType.ALIPAY.name());
if (paymentInfo == null) {
return "failure";
}
if (!outTradeNo.equals(paymentInfo.getOutTradeNo())) {
return "failure";
}
//4.验证用户支付金额跟商家端金额是否一致
String totalAmount = paramsMap.get("total_amount");
if (new BigDecimal(totalAmount).compareTo(paymentInfo.getTotalAmount()) != 0) {
return "failure";
}
//5.验证商户端APPID跟支付宝提交是否一致
String appId = paramsMap.get("app_id");
if (!AlipayConfig.app_id.equals(appId)) {
return "failure";
}
//6.验证用户支付状态
String tradeStatus = paramsMap.get("trade_status");
if ("TRADE_SUCCESS".equals(tradeStatus)) {
//7.修改本地交易记录 将交易状态改为:“已支付”
paymentInfo.setPaymentStatus(PaymentStatus.PAID.name());
//支付宝交易号
String tradeNo = paramsMap.get("trade_no");
paymentInfo.setTradeNo(tradeNo);
paymentInfo.setCallbackTime(new Date());
paymentInfo.setCallbackContent(paramsMap.toString());
paymentInfoService.updateById(paymentInfo);
//TODO 8.基于MQ通知订单系统,修改订单状态
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_PAY, MqConst.ROUTING_PAYMENT_PAY, paymentInfo.getOrderId());
return "success";
}
return null;
} catch (Exception e) {
redisTemplate.delete(key);
throw new RuntimeException(e);
}
}
service-order
模块中创建OrderReceiver类添加方法
package com.atguigu.gmall.order.reciever;
import com.atguigu.gmall.order.service.OrderInfoService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-09-18 16:00
*/
@Slf4j
@Component
public class OrderReceiver {
@Autowired
private OrderInfoService orderInfoService;
/**
* 监听订单支付成功消息;更新订单状态;发送消息通知库存服务锁定库存
*
* @param orderId
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_PAYMENT_PAY, durable = "true"),
value = @Queue(value = MqConst.QUEUE_PAYMENT_PAY, durable = "true"),
key = MqConst.ROUTING_PAYMENT_PAY
))
public void processPaySucess(Long orderId, Message message, Channel channel) {
//业务处理
if (orderId != null) {
log.info("[订单服务]监听订单支付成功消息:{}", orderId);
orderInfoService.processPaySucess(orderId);
}
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
OrderInfoService
/**
* 处理订单支付成功->修改订单状态,发送消息通知库存锁定库存
* @param orderId
*/
void processPaySucess(Long orderId);
OrderInfoServiceImpl
/**
* 处理订单支付成功->修改订单状态,发送消息通知库存锁定库存
*
* @param orderId
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void processPaySucess(Long orderId) {
//1.监听到订单被支付成功后业务处理只处理一次:幂等性处理
String key = "mq:pay:success:" + orderId;
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, orderId.toString(), 5, TimeUnit.MINUTES);
if (!flag) {
return;
}
//2.修改订单状态改为:已支付
this.updateOrderStatus(orderId, ProcessStatus.PAID);
//3.TODO 发送MQ消息通知库存系统锁定库存(查询文档得到库存系统所需参数)
}
订单模块除了接收到请求改变单据状态,还要发送库存系统
查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。
/**
* 处理订单支付成功->修改订单状态,发送消息通知库存锁定库存
*
* @param orderId
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void processPaySucess(Long orderId) {
//1.监听到订单被支付成功后业务处理只处理一次:幂等性处理
String key = "mq:pay:success:" + orderId;
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, orderId.toString(), 5, TimeUnit.MINUTES);
if (!flag) {
return;
}
//2.修改订单状态改为:已支付
this.updateOrderStatus(orderId, ProcessStatus.PAID);
//3.TODO 发送MQ消息通知库存系统锁定库存(查询文档得到库存系统所需参数-JSON字符串包含订单相关信息,订单明细集合)
//3.1 根据订单ID查询订单及订单明细列表
OrderInfo orderInfo = this.getOrderInfo(orderId);
if (orderInfo != null) {
//3.2 基于订单相关信息封装对接库存系统Map 在发送消息到MQ将Map转为JSON字符串
Map<String, Object> mapResult = initWareMap(orderInfo);
//3.3 发送消息到MQ异步通知库存系统进行库存锁定或拆单
String deductMsg = JSON.toJSONString(mapResult);
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_WARE_STOCK, MqConst.ROUTING_WARE_STOCK, deductMsg);
}
}
/**
* 基于订单相关信息,封装库存系统所需参数
*
* @param orderInfo
* @return
*/
private Map<String, Object> initWareMap(OrderInfo orderInfo) {
Map<String, Object> mapResult = new HashMap<>();
//1.封装订单相关信息-7项数据
mapResult.put("orderId", orderInfo.getId());
mapResult.put("consignee", orderInfo.getConsignee());
mapResult.put("consigneeTel", orderInfo.getConsigneeTel());
mapResult.put("orderComment", orderInfo.getOrderComment());
mapResult.put("orderBody", orderInfo.getTradeBody());
mapResult.put("deliveryAddress", orderInfo.getDeliveryAddress());
mapResult.put("paymentWay", PaymentWay.ONLINE.name().equals(orderInfo.getPaymentWay()) ? "1" : "2");
//2.封装订单明细列表-3项数据
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
if (!CollectionUtils.isEmpty(orderDetailList)) {
//2.1 采用Stream流将集合泛型从OrderDetail转变为Map
List<Map<String, Object>> orderDetails = orderDetailList.stream().map(orderDetail -> {
Map<String, Object> orderDetailMap = new HashMap<>();
orderDetailMap.put("skuId", orderDetail.getSkuId());
orderDetailMap.put("skuNum", orderDetail.getSkuNum());
orderDetailMap.put("skuName", orderDetail.getSkuName());
return orderDetailMap;
}).collect(Collectors.toList());
mapResult.put("details", orderDetails);
}
return mapResult;
}
给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。
同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。
接受到消息后主要做的工作就是更新订单状态。
在订单项目中OrderReceiver
/**
* 监听(库存服务)库存扣减结果:根据结果修改订单状态
* 分析:当前业务不需要做幂等性处理;做事务管理
*
* @param stockDeductStr
* @param message
* @param channel
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_WARE_ORDER, durable = "true"),
value = @Queue(value = MqConst.QUEUE_WARE_ORDER, durable = "true"),
key = MqConst.ROUTING_WARE_ORDER
))
public void processStockDeductResult(String stockDeductStr, Message message, Channel channel) {
//业务处理
if (StringUtils.isNotBlank(stockDeductStr)) {
log.info("[订单服务]监听扣减库存结果:{}", stockDeductStr);
orderInfoService.processStockDeductResult(stockDeductStr);
}
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
OrderInfoServiceImpl处理
/**
* 处理库存扣减结果-根据结果更新订单状态
*
* @param stockDeductStr "{"orderId":61,"status":"DEDUCTED"}"
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void processStockDeductResult(String stockDeductStr) {
//1.将字符串转为Map
Map map = JSON.parseObject(stockDeductStr, Map.class);
String orderId = (String) map.get("orderId");
String status = (String) map.get("status");
//2.获取扣减状态更新订单
if ("DEDUCTED".equals(status)) {
//将订单状态状态修改为:代发货
this.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.WAITING_DELEVER);
} else if ("OUT_OF_STOCK".equals(status)) {
this.updateOrderStatus(Long.valueOf(orderId), ProcessStatus.STOCK_EXCEPTION);
}
}
YAPI接口地址:http://192.168.200.128:3000/project/11/interface/api/747
service-order
订单模块OrderController
处理拆单请求
/**
* 提供给库存系统进行远程调用,完成订单拆单
*
* @param orderId 订单ID
* @param wareSkuMap "[{"wareId":1,skuIds:[27]},{"wareId":2,skuIds:[31]}]"
* @return
*/
@ApiOperation("订单拆单接口处理")
@PostMapping("/orderSplit")
public String orderSplit(@RequestParam("orderId") Long orderId, @RequestParam("wareSkuMap") String wareSkuMap) {
return orderInfoService.orderSplit(orderId, wareSkuMap);
}
OrderInfoService
/**
* 根据库存系统反馈进行拆单
* @param orderId 订单ID
* @param wareSkuMap 仓库跟商品Sku列表映射关系
* @return
*/
String orderSplit(Long orderId, String wareSkuMap);
OrderInfoServiceImpl
/**
* 提供给库存系统进行远程调用,完成订单拆单
* 本质:将一个原始订单变成多个新订单(每个子订单中包含订单明细)
*
* @param orderId 需要进行拆分原始订单ID
* @param wareSkuMap "[{"wareId":1,skuIds:[27]},{"wareId":2,skuIds:[31]}]"
* @return
*/
@Override
public String orderSplit(Long orderId, String wareSkuMap) {
//TODO 幂等性处理
if (orderId == null || StringUtils.isEmpty(wareSkuMap)) {
return null;
}
//1.根据原始订单ID查询原始订单及订单明细列表
OrderInfo originOrderInfo = this.getOrderInfo(orderId);
List<OrderDetail> originOrderDetailList = originOrderInfo.getOrderDetailList();
//2.将提交的仓库跟SkuID对照关系字符串转为Map List集合<仓库ID,skuId集合>
List<Map> skuWareMapList = JSON.parseArray(wareSkuMap, Map.class);
if (!CollectionUtils.isEmpty(skuWareMapList)) {
//3.循环遍历List 每遍历一次构建一个新子订单对象且找出子订单中订单明细 将新子订单、子订单明细进行保存
List<OrderInfo> allSubOrderInfoList = skuWareMapList.stream().map(map -> {
//3.1 获取当前订单对应出货仓库ID
String wareId = (String) map.get("wareId");
//3.2 当前订单中包含订单明细商品ID列表
List<String> skuIdList = (List<String>) map.get("skuIds");
//3.3 创建自订单对象-子订单对象中大部分属性从原始订单中拷贝获取
OrderInfo subOrderInfo = new OrderInfo();
BeanUtils.copyProperties(originOrderInfo, subOrderInfo);
subOrderInfo.setWareId(wareId);
//3.3.1 对原始订单明细进行过滤->得到当前新子订单订单明细计算总金额,订单概要
List<OrderDetail> subOrderDetailList = originOrderDetailList.stream().filter(originOrderDetail -> {
return skuIdList.contains(originOrderDetail.getSkuId().toString());
}).collect(Collectors.toList());
subOrderInfo.setOrderDetailList(subOrderDetailList);
subOrderInfo.sumTotalAmount();
String newTradeBody = subOrderDetailList.stream().map(OrderDetail::getSkuName).collect(Collectors.joining(","));
subOrderInfo.setTradeBody(newTradeBody);
//3.3.2 设置当前子订单上级订单ID
subOrderInfo.setParentOrderId(originOrderInfo.getId());
//3.3.3 保存新子订单
this.save(subOrderInfo);
Long subOrderId = subOrderInfo.getId();
//3.3.4 将所有子订单明细关联新子订单ID
for (OrderDetail orderDetail : subOrderDetailList) {
orderDetail.setOrderId(subOrderId);
}
orderDetailService.saveBatch(subOrderDetailList);
subOrderInfo.setOrderDetailList(subOrderDetailList);
return subOrderInfo;
}).collect(Collectors.toList());
//4.将原始订单状态改为已拆分(前端页面不需要展示原始订单)
originOrderInfo.setOrderStatus(OrderStatus.SPLIT.name());
this.updateById(originOrderInfo);
//5.响应业务数据返回库存系统-封装库存系统所需参数
List<Map<String, Object>> collect = allSubOrderInfoList.stream().map(subOrderInfo -> {
Map<String, Object> map = initWareMap(subOrderInfo);
return map;
}).collect(Collectors.toList());
return JSON.toJSONString(collect);
}
return null;
}
/**
* 基于订单相关信息,封装库存系统所需参数
*
* @param orderInfo
* @return
*/
private Map<String, Object> initWareMap(OrderInfo orderInfo) {
Map<String, Object> mapResult = new HashMap<>();
//1.封装订单相关信息-7项数据
mapResult.put("orderId", orderInfo.getId());
mapResult.put("consignee", orderInfo.getConsignee());
mapResult.put("consigneeTel", orderInfo.getConsigneeTel());
mapResult.put("orderComment", orderInfo.getOrderComment());
mapResult.put("orderBody", orderInfo.getTradeBody());
mapResult.put("deliveryAddress", orderInfo.getDeliveryAddress());
mapResult.put("paymentWay", PaymentWay.ONLINE.name().equals(orderInfo.getPaymentWay()) ? "1" : "2");
//拆单业务中必须要设置订单对应出货仓库ID
if (!StringUtils.isEmpty(orderInfo.getWareId())) {
mapResult.put("wareId", orderInfo.getWareId());
}
//2.封装订单明细列表-3项数据
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
if (!CollectionUtils.isEmpty(orderDetailList)) {
//2.1 采用Stream流将集合泛型从OrderDetail转变为Map
List<Map<String, Object>> orderDetails = orderDetailList.stream().map(orderDetail -> {
Map<String, Object> orderDetailMap = new HashMap<>();
orderDetailMap.put("skuId", orderDetail.getSkuId());
orderDetailMap.put("skuNum", orderDetail.getSkuNum());
orderDetailMap.put("skuName", orderDetail.getSkuName());
return orderDetailMap;
}).collect(Collectors.toList());
mapResult.put("details", orderDetails);
}
return mapResult;
}
订单持久层Mapper映射文件中,增加过滤订单条件,将拆分原始订单排除掉 OrderInfoMapper.xml
<!--查询用户订单列表-->
<select id="getOrderList" resultMap="orderInfoMap">
select
oi.id,
oi.consignee,
oi.consignee_tel,
oi.total_amount,
oi.order_status,
oi.user_id,
oi.payment_way,
oi.delivery_address,
oi.order_comment,
oi.out_trade_no,
oi.trade_body,
oi.create_time,
od.id order_deatil_id,
od.sku_id,
od.sku_name,
od.img_url,
od.order_price,
od.sku_num
from order_info oi inner join order_detail od on od.order_id = oi.id
where oi.user_id = #{userId} and oi.order_status != 'SPLIT'
order by oi.create_time
</select>
/**
* 关闭交易
*/
public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";
public static final String ROUTING_PAYMENT_CLOSE = "payment.close";
//队列
public static final String QUEUE_PAYMENT_CLOSE = "queue.payment.close";
service-order
中修改com.atguigu.gmall.order.service.impl.OrderInfoServiceImpl#execCloseOrder方法
/**
* 根据订单ID查询订单支付状态,如果订单状态为:未支付,将其改为关闭
*
* @param orderId
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void execCloseOrder(Long orderId) {
//1.根据订单ID查询订单信息
OrderInfo orderInfo = this.getById(orderId);
//2.判断订单状态
if (orderId != null && PaymentStatus.UNPAID.name().equals(orderInfo.getOrderStatus())) {
this.updateOrderStatus(orderId, ProcessStatus.CLOSED);
}
//3.TODO 发送MQ消息通知支付系统关闭可能产生本地交易记录跟支付宝交易记录
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, MqConst.ROUTING_PAYMENT_CLOSE, orderId);
}
package com.atguigu.gmall.payment.receiver;
import com.atguigu.gmall.payment.service.AlipayService;
import com.atguigu.gmall.payment.service.PaymentInfoService;
import com.atguigu.gmall.rabbit.config.MqConst;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-09-20 15:42
*/
@Slf4j
@Component
public class PaymentReceiver {
@Autowired
private AlipayService alipayService;
/**
* 监听订单关闭,将可能产生本地交易记录跟支付宝交易记录同样关闭
*
* @param orderId
* @param channel
* @param message
*/
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_PAYMENT_CLOSE, durable = "true"),
value = @Queue(value = MqConst.QUEUE_PAYMENT_CLOSE, durable = "true"),
key = MqConst.ROUTING_PAYMENT_CLOSE
))
public void processCloseTrade(Long orderId, Channel channel, Message message) {
//业务处理
if (orderId != null) {
log.info("[支付服务]监听关闭交易消息:{}", orderId);
alipayService.processCloseTrade(orderId);
}
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
AlipayService
/**
* @param orderId
*/
void processCloseTrade(Long orderId);
AlipayServiceImpl
/**
* 将可能产生本地交易记录跟支付宝交易记录同样关闭
*
* @param orderId
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void processCloseTrade(Long orderId) {
//1.先根据订单ID+支付方式查询本地交易记录 如果有该记录将状态改为关闭
PaymentInfo paymentInfo = paymentInfoService.getPaymentInfo(orderId, PaymentType.ALIPAY.name());
if (paymentInfo != null) {
paymentInfo.setPaymentStatus(PaymentStatus.CLOSED.name());
paymentInfoService.updateById(paymentInfo);
//2.调用支付宝接口查询是否产生支付宝交易,如果存在且未支付,将支付宝交易关闭(二维码也就不能付款)
//2.1 根据订单编号查询支付宝端交易状态
String tradeStatus = this.getAlipayTradeStatus(paymentInfo.getOutTradeNo());
if ("WAIT_BUYER_PAY".equals(tradeStatus)) {
//2.2 如果支付宝端交易状态为待支付,执行关闭交易
this.closeAlipayTrade(paymentInfo.getOutTradeNo());
}
}
}
AlipayService接口
/**
* 关闭支付宝交易
* @param outTradeNo
*/
void closeAlipayTrade(String outTradeNo);
AlipayServiceImpl
/**
* 关闭支付宝交易
*
* @param outTradeNo
*/
@Override
public void closeAlipayTrade(String outTradeNo) {
try {
AlipayTradeCloseRequest request = new AlipayTradeCloseRequest();
JSONObject bizContent = new JSONObject();
bizContent.put("out_trade_no", outTradeNo);
request.setBizContent(bizContent.toString());
AlipayTradeCloseResponse response = alipayClient.execute(request);
if (response.isSuccess()) {
log.info("支付宝关闭交易调用成功");
} else {
log.info("支付宝关闭交易调用失败");
}
} catch (Exception e) {
}
}
AlipayService
/**
* 查询支付宝端交易状态
*
* @param outTradeNo
* @return
*/
String getAlipayTradeStatus(String outTradeNo);
AlipayServiceImpl
/**
* 查询支付宝交易是否存在,如果存在判断状态
*
* @param outTradeNo
* @return
*/
@Override
public String getAlipayTradeStatus(String outTradeNo) {
try {
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();
JSONObject bizContent = new JSONObject();
bizContent.put("out_trade_no", outTradeNo);
request.setBizContent(bizContent.toString());
AlipayTradeQueryResponse response = alipayClient.execute(request);
if (response.isSuccess()) {
return response.getTradeStatus();
} else {
System.out.println("调用失败");
}
return null;
} catch (Exception e) {
//用户未扫码就会查询异常
return null;
}
}