[TOC]
详情页面是被用户高频访问的,所以性能需要优化。一个系统最大的性能瓶颈,就是数据库的IO操作。从数据库入手也是调优性价比最高的切入点。
一般分为两个层面:
缓存最常见的4个问题:
查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。流量大时,可能DB就挂掉了。如果利用不存在的key频繁攻击应用,这就是程序漏洞。
解决方案1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透(例如利用skuid=uuid生成大量的随机id进行访问,这些id很可能大部分都不存在)。
解决方案2 :使用Redis的Bitmap或者布隆过滤器解决随机穿透问题 。
设置的缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重,导致雪崩。
解决方案1:在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,避免引发集体失效的事件。
解决方案2:可以采用集群部署方式,防止单节点宕机。
一个设置了过期时间的key,可能会在某些时间点被超高并发地访问(热点数据),如果这个key的缓存数据在大量请求同时进来时恰好失效,则所有对这个key的数据查询都会落到DB,称为缓存击穿。
总结:
雪崩:很多key集体失效
击穿:一个热点key失效,并同时被高并发访问
之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题
在spzx-product
中新建TestController中添加测试方法
package com.spzx.product.controller;
@Tag(name = "测试接口")
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("/testLock")
public AjaxResult testLock() {
testService.testLock();
return AjaxResult.success();
}
}
业务接口
package com.spzx.product.service;
public interface TestService {
void testLock();
}
业务实现类
package com.spzx.product.service.impl;
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void testLock() {
// 查询Redis中的num值
String value = this.stringRedisTemplate.opsForValue().get("num");
// 没有该值return
if (StringUtils.isBlank(value)){
return ;
}
// 有值就转成int
int num = Integer.parseInt(value);
// 把Redis中的num值+1
this.stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
}
}
说明:重启spzx-product
服务,通过reids客户端设置num=0
在linux 系统中安装 ab 压力测试工具 httpd-tools
yum install -y httpd-tools
配置本地锁测试接口为白名单
# 不校验白名单
ignore:
whites:
...省略
- /product/test/**
注意:测试前将Windows防火墙关闭!
命令语法:
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
测试如下:5000请求,100并发
ab -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock
查看Redis中的值:
添加synchronized
关键字
@Override
public synchronized void testLock() {
}
使用ab工具压力测试:5000次请求,并发100
查看Redis中的结果:
与预期一致。是否真的完美?接下来再看集群情况下,会怎样?
接下来启动spzx-product
的9205、9215、9225三个运行实例:
-Dserver.port=9215
-Dserver.port=9225
ab -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock
查看Redis中的值:
集群情况下又出问题了。
以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里存在局限性。此时需要分布式锁。
要解决的问题:单机部署情况下的并发控制锁策略失效,需要一种跨JVM的机制来控制共享资源的访问。
主流的实现方案:
SETNX
是 Redis 中的一个命令,用于设置键值对,但只有在键不存在的情况下才会执行成功。
SETNX
是 “SET if Not eXists” 的简称。它主要用于实现分布式锁和避免键值覆盖。
使用方法:
SETNX key value
返回值:
示例:
假设我们有一个键 mykey
:
127.0.0.1:6379> SETNX mykey "Hello"
(integer) 1
127.0.0.1:6379> SETNX mykey "World"
(integer) 0
127.0.0.1:6379> GET mykey
"Hello"
/**
* 采用SpringDataRedis实现分布式锁
* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
*/
@Override
public void testLock() {
//0.先尝试获取锁 setnx key val
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
//stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//System.out.println("value = " + value);
//System.out.println("出现异常");
//int a = 9/0;
/*try {
Thread.sleep(7000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
stringRedisTemplate.delete("lock");
}else{
//获取锁失败,等待1秒后重试
try {
Thread.sleep(1000);
System.out.println("重试");
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
重启,服务集群,通过网关压力测试:
ab -n 5000 -c 100 http://ip:8080/product/test/testLock
查看Redis中的值:
死锁问题:如果有一个客户端在持有锁期间,业务逻辑出现异常,导致锁无法释放,会造成后续其他客户端一直等待。
注意:每次测试后在redis中删除lock锁
int serverPort = request.getServerPort();
if(serverPort == 9205){
int a = 9/0;
}
解决方案1::添加try catch finally
try{
String value = stringRedisTemplate.opsForValue().get("num");
if(serverPort == 9205){
int a = 9/0;
}
if (StringUtils.isBlank(value)) {
return;
}
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
} catch(Exception e){
System.out.println("异常捕获");
}finally{
//4.将锁释放
System.out.println("释放锁");
stringRedisTemplate.delete("lock");
}
如果遇到极端情况,服务器宕机,则无法执行finally,因此可以结合使用方案2
解决方案2:设置过期时间,自动释放锁。
设置过期时间有两种方式:
设置过期时间:
stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS);
设置过期时间:(注意:删除上一步的finally)
//0.先尝试获取锁:set命令代替setnx命令
//SET key value NX EX 3
//以上命令会在键不存在时设置键值,并同时设置一个3秒的过期时间,所有操作都在一个原子操作中完成。
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS);
压力测试肯定也没有问题。(注意:测试前删掉上一步中redis中没有释放的锁)
问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s。执行流程如下
index1业务逻辑没执行完,3秒后锁被自动释放。
index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
index3获取到锁,执行业务逻辑,1秒后index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。
最终等于没锁的情况。(注意:测试时删掉发生异常的语句)
结论:
解决:
@Override
public void testLock() throws InterruptedException {
//0.先尝试获取锁 setnx key val
String uuid = UUID.randomUUID().toString();
//过期时间设置的长一点
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) {
stringRedisTemplate.delete("lock");
}
}else{
//获取锁成功,执行业务代码
Thread.sleep(10);
System.out.println("重试");
this.testLock();
}
}
问题:删除操作缺乏原子性。
场景:
1、index1执行删除时,查询到的lock值确实和uuid相等
if(uuid.equals(stringRedisTemplate.opsForValue().get("lock")))
2、index1执行删除前,lock刚好过期时间已到,被Redis自动释放,在Redis中没有了锁。
stringRedisTemplate.delete("lock");
3、index2获取了lock,index2线程获取到了cpu的资源,开始执行方法
4、index1执行删除,此时会把index2的lock删除:
stringRedisTemplate.delete("lock");
测试代码:调整各部分的时间
@Override
public void testLock() throws InterruptedException {
//0.先尝试获取锁 setnx key val
String uuid = UUID.randomUUID().toString();
//过期时间设置的长一点
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.MILLISECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
Thread.sleep(20);
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) {
Thread.sleep(20);
stringRedisTemplate.delete("lock");
}
}else{
//获取锁成功,执行业务代码
Thread.sleep(10);
System.out.println("重试");
this.testLock();
}
}
redis内置了LUA脚本解释器,redis客户端中通过EVAL可以执行LUA脚本
-- eval script numkeys key [key ...] arg [arg ...]
-- 1、返回执行结果 最后的0代表没有参数
EVAL "return 'hello!'" 0
-- 2、参数的使用 2代表传入的keys(通过KEYS[i]使用)有两个,其余的参数通过ARGV[i]获取使用 ..拼接字符串
EVAL "return KEYS[1]..','..KEYS[2]..','..ARGV[1]..','..ARGV[2]" 2 key1 key2 argv1 argv2
-- 3、局部变量的使用 local用来定义局部变量
EVAL "local a=KEYS[1] return a" 1 hehe
-- 4、条件判断 语句结束需要使用end, ~=不等于 ==等于
EVAL "local age=tonumber(KEYS[1]) if(age>=18) then return 'laoniao' elseif(age<18 and age>14) then return 'cainiao' else return 'pangju' end" 1 16
-- 5、LUA脚本执行redis命令
EVAL "return redis.call('set' , KEYS[1] , ARGV[1])" 1 a b
EVAL "return redis.call('set' , KEYS[1] , KEYS[2] , KEYS[3] , KEYS[4])" 4 a b ex 60
释放锁的LUA脚本:
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
redis java客户端使用LUA脚本:
//通过execute可以执行LUA脚本,参数1:脚本字符串,参数2:脚本返回值类型,参数3:keys列表,参数4:argv列表
stringRedisTemplate.execute(
new DefaultRedisScript<>(script , Boolean.class),
list,
args...
)
注意:
修改释放锁的方式,使用lua
@Override
public void testLock() throws InterruptedException {
//0.先尝试获取锁 setnx key val
String uuid = UUID.randomUUID().toString();
//过期时间设置的长一点
Boolean flag = stringRedisTemplate
.opsForValue()
.setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
Thread.sleep(20);
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
//锁的释放
System.out.println("锁的释放");
String script = """
if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else
return 0
end
""";
Boolean lock = stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Boolean.class),
Arrays.asList("lock"),
uuid);
System.out.println("result = " + lock);
}else{
//获取锁成功,执行业务代码
Thread.sleep(10);
System.out.println("重试");
this.testLock();
}
}
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下几个条件:
多进程可见:否则就无法实现分布式效果。
互斥性:同一时刻,只能有一个进程获得锁。
避免死锁:出现异常情况,保证锁可以被释放。
解铃还须系铃人:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
原子性:加锁和解锁必须具有原子性。
高可用:避免锁服务宕机或处理好宕机的补救措施(redis集群架构)
ChannelController
//将
ProductSku productSku = productSkuService.getById(skuId);
//修改成
ProductSku productSku = productSkuService.getProductSku(skuId);
ProductSkuServiceImpl
@Autowired
private RedisTemplate redisTemplate;
/*
* 根据SkuID查询SKU商品信息
*
* @param skuId
* @return
*/
@Override
public ProductSku getProductSku(Long skuId) {
try {
//1.优先从缓存中获取数据
//1.1 构建业务数据Key 形式:前缀+业务唯一标识
String dataKey = "product:sku:" + skuId;
//1.2 查询Redis获取业务数据
ProductSku productSku = (ProductSku) redisTemplate.opsForValue().get(dataKey);
//1.3 命中缓存则直接返回
if (productSku != null) {
log.info("命中缓存,直接返回,线程ID:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());
return productSku;
}
//2.尝试获取分布式锁(set k v nx ex 可能获取锁失败)
//2.1 构建锁key
String lockKey = "product:sku:lock:" + skuId;
//2.2 采用UUID作为线程标识
String lockVal = UUID.randomUUID().toString().replaceAll("-", "");
//2.3 利用Redis提供set nx ex 获取分布式锁
Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockVal, 5, TimeUnit.SECONDS);
if (flag) {
//3.获取锁成功执行业务,将查询业务数据放入缓存Redis
log.info("获取锁成功:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());
try {
//再次检查缓存,是否有数据
productSku = (ProductSku)redisTemplate.opsForValue().get(dataKey);
if(productSku != null){
return productSku;
}
productSku = baseMapper.selectById(skuId);
//防止缓存穿透,将null值转换成新对象存入数据库,并且设置较短的过期时间
long ttl = productSku == null ? 1 * 60 : 10 * 60;
if(productSku == null){
productSku = new ProductSku();
}
//将查询数据库结果放入缓存
redisTemplate.opsForValue().set(dataKey, productSku, ttl, TimeUnit.SECONDS);
return productSku;
} finally {
//4.业务执行完毕释放锁
String scriptText = """
if redis.call('get',KEYS[1]) == ARGV[1]
then
return redis.call('del',KEYS[1])
else
return 0
end
""";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(scriptText);
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockVal);
}
} else {
try {
//5.获取锁失败则自旋(业务要求必须执行)
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.error("获取锁失败,自旋:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());
return this.getProductSku(skuId);
}
} catch (Exception e) {
//兜底处理方案:Redis服务有问题,将业务数据获取自动从数据库获取
log.error("[商品服务]查询商品信息异常:{}", e);
return baseMapper.selectById(skuId);
}
}
Redis 的 Bitmap(位图)是一种特殊的字符串数据类型,它利用字符串类型键(key)来存储一系列连续的二进制位(bits),每个位可以独立地表示一个布尔值(0 或 1)。这种数据结构非常适合用于存储和操作大量二进状态的数据,尤其在需要高效空间利用率和特定位操作场景中表现出色。
setbit key offset value
:设置或清除指定偏移量上的位(bit)。offset
是从0开始的位索引,value
可以为 0 或 1。getbit key offset
:返回指定偏移量上的位值。在spzx-product
模块的启动类中实现CommandLineRunner接口
@EnableCustomConfig
@EnableRyFeignClients
@SpringBootApplication
public class SpzxProductApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(SpzxProductApplication.class,args);
}
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ProductSkuMapper productSkuMapper;
//Springboot应用初始化后会执行一次该方法
@Override
public void run(String... args) throws Exception {
String key = "product:sku:data";
//查询mysql里面商品skuId
List<ProductSku> productSkuList = productSkuMapper.selectList(null);
productSkuList.forEach(item -> {
//添加到redis
redisTemplate.opsForValue().setBit(key,item.getId(),true);
});
}
}
ChannelController的item方法中,首先从Bitmap中获取数据
@Autowired
private RedisTemplate redisTemplate;
@Operation(summary = "商品详情")
@GetMapping("/item/{skuId}")
public AjaxResult item(
@Parameter(description = "skuId")
@PathVariable Long skuId) {
String key = "product:sku:data";
Boolean flag = redisTemplate.opsForValue().getBit(key, skuId);
if (!flag) {
log.error("用户查询商品sku不存在:{}", skuId);
//查询数据不存在直接返回空对象
throw new ServiceException("用户查询商品sku不存在");
}
...
return success(itemVo);
}
商品上架时将数据添加到Bitmap中
ProductServiceImpl.updateStatus方法
@Autowired
private RedisTemplate redisTemplate;
@Transactional(rollbackFor = Exception.class)
@Override
public void updateStatus(Long id, Integer status) {
Product product = new Product();
product.setId(id);
product.setStatus(status);
baseMapper.updateById(product);
//更新sku的status
productSkuMapper.update(
null,
new LambdaUpdateWrapper<ProductSku>()
.eq(ProductSku::getProductId, id)
.set(ProductSku::getStatus, status));
String key = "product:sku:data";
List<ProductSku> productSkuList = productSkuMapper.selectList(
new LambdaQueryWrapper<ProductSku>().eq(ProductSku::getProductId, id));
if (status == 1) {
// 商品上架时,将 sku ID 列表放入 bitmap
productSkuList.forEach(item -> {
redisTemplate.opsForValue().setBit(key, item.getId(), true);
});
} else if (status == -1) {
// 商品下架时,将 sku ID 列表从 bitmap 中移除
productSkuList.forEach(item -> {
redisTemplate.opsForValue().setBit(key, item.getId(), false);
});
}
}
问题:查询商品详情页的逻辑非常复杂,数据的获取都需要业务调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要如下标注的时间才能完成
那么,用户需要3s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这些操作,就会更快得到响应。
CompletableFuture
可以使原本串行执行的代码,变为并行执行,提高代码执行速度。
全局自定义线程池配置
package com.spzx.product.configure;
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
int processorsCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = processorsCount; // 设为处理器数量
int maximumPoolSize = processorsCount * 2; // 最大线程数设为核心线程的两倍
long keepAliveTime = 60L; // 保持活跃时间为 60 秒
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 使用有界队列控制任务数量
Executors.defaultThreadFactory(),
//new ThreadPoolExecutor.CallerRunsPolicy()
//自定义拒绝策略
(runnable, executor) -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
//再次将拒绝任务提交给线程池执行
executor.submit(runnable);
}
);
//线程会在需要时创建
return threadPoolExecutor;
}
}
processorsCount
,即每个处理器一个线程。processorsCount * 2
,允许在高峰负载时增加线程数。keepAliveTime
设为 60 秒,在负载高峰过后可以减少活动线程数,减少资源占用。ArrayBlockingQueue
作为任务队列,并设置一个合适的尺寸,可以控制队列中最多只能存放多少个等待执行的任务。如果所有线程都在忙碌并且队列达到容量,则会根据拒绝策略处理新增任务。@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Operation(summary = "商品详情")
@GetMapping("/item/{skuId}")
public AjaxResult item(
@Parameter(description = "skuId")
@PathVariable Long skuId) {
//0、Bitmap
......
ItemVo itemVo = new ItemVo();
//有返回值的异步方法
CompletableFuture<ProductSku> futureProductSku = CompletableFuture.supplyAsync(() -> {
//获取sku信息
ProductSku productSku = productSkuService.getById(skuId);
itemVo.setProductSku(productSku);
return productSku;
}, threadPoolExecutor);
//异步执行,并且获取商品sku方法的返回值,本身没有返回值
CompletableFuture<Void> futureProduct = futureProductSku.thenAcceptAsync(productSku -> {
//获取商品id
Long productId = productSku.getProductId();
//获取商品信息
Product product = productService.getById(productId);
//商品信息:轮播图列表
String[] sliderUrlList = product.getSliderUrls().split(",");
itemVo.setSliderUrlList(sliderUrlList);
//商品信息:商品规格列表
JSONArray specValueList = JSON.parseArray(product.getSpecValue());
itemVo.setSpecValueList(specValueList);
});
//异步执行,不需要获取上一个方法的返回值,本身也没有返回值
CompletableFuture<Void> futureSkuPrice = CompletableFuture.runAsync(() -> {
//获取商品最新价格
SkuPriceVo skuPrice = productSkuService.getSkuPrice(skuId);
itemVo.setSkuPrice(skuPrice);
}, threadPoolExecutor);
//异步执行,并且获取商品sku方法的返回值,本身没有返回值
CompletableFuture<Void> futureProductDetails = futureProductSku.thenAcceptAsync(productSku->{
//获取商品id
Long productId = productSku.getProductId();
//获取商品详情图片
String[] detailsImageUrlList = productDetailsService.getProductDetails(productId);
itemVo.setDetailsImageUrlList(detailsImageUrlList);
});
//异步执行,并且获取商品sku方法的返回值,本身没有返回值
CompletableFuture<Void> futureSkuSpecValue = futureProductSku.thenAcceptAsync(productSku -> {
//获取商品id
Long productId = productSku.getProductId();
//获取商品规格Map
Map<String, Long> skuSpecValueMap = productSkuService.getSkuSpecValue(productId);
itemVo.setSkuSpecValueMap(skuSpecValueMap);
});
//异步执行,不需要获取上一个方法的返回值,本身也没有返回值
CompletableFuture<Void> futureSkuStock = CompletableFuture.runAsync(() -> {
//获取商品库存信息
SkuStockVo skuStock = skuStockService.getSkuStock(skuId);
itemVo.setSkuStock(skuStock);
}, threadPoolExecutor);
//等待所有任务都完成
CompletableFuture.allOf(
futureProductSku,
futureProduct,
futureProductDetails,
futureSkuPrice,
futureSkuSpecValue,
futureSkuStock)
.join();//这是一个阻塞操作,程序会在此等待直到所有任务都完成。
//返回商品详情
return success(itemVo);
}