[TOC] # 商品详情优化 ## 1 优化分析 ### 1.1 思路 详情页面是被用户高频访问的,所以性能需要优化。一个系统最大的性能瓶颈,就是数据库的IO操作。从数据库入手也是调优性价比最高的切入点。 一般分为两个层面: - 一是对数据库本身做性能优化(参考MySQL高级课程) - 二是尽量避免直接查询数据库。避免直接查询数据库的解决办法就是:**缓存** ### 1.2 缓存常见问题 缓存最常见的4个问题: 1. 缓存穿透 2. 缓存雪崩 3. 缓存击穿 3. 缓存一致性问题:延迟双删 #### 1.2.1 缓存穿透 查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。流量大时,可能DB就挂掉了。如果利用不存在的key频繁攻击应用,这就是程序漏洞。 - 解决方案1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透(例如利用skuid=uuid生成大量的随机id进行访问,这些id很可能大部分都不存在)。 - 解决方案2 :使用Redis的Bitmap或者布隆过滤器解决随机穿透问题 。 #### 1.2.2 缓存雪崩 设置的缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重,导致雪崩。 - 解决方案1:在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,避免引发集体失效的事件。 - 解决方案2:可以采用集群部署方式,防止单节点宕机。 #### 1.2.3 缓存击穿 一个设置了过期时间的key,可能会在某些时间点被超高并发地访问(热点数据),如果这个key的缓存数据在大量请求同时进来时恰好失效,则所有对这个key的数据查询都会落到DB,称为缓存击穿。 - 解决方案:锁 img 总结: - 穿透:频繁查询不存在的数据 - 雪崩:很多key集体失效 - 击穿:一个热点key失效,并同时被高并发访问 ## 2 分布式锁 ### 2.1 本地锁的局限性 之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题 #### 2.1.1 编写测试代码 在``spzx-product``中新建TestController中添加测试方法 ```java package com.spzx.product.controller; @Tag(name = "测试接口") @RestController @RequestMapping("/test") public class TestController { @Autowired private TestService testService; @GetMapping("/testLock") public AjaxResult testLock() { testService.testLock(); return AjaxResult.success(); } } ``` 业务接口 ```java package com.spzx.product.service; public interface TestService { void testLock(); } ``` 业务实现类 ```java package com.spzx.product.service.impl; @Service public class TestServiceImpl implements TestService { @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void testLock() { // 查询Redis中的num值 String value = this.stringRedisTemplate.opsForValue().get("num"); // 没有该值return if (StringUtils.isBlank(value)){ return ; } // 有值就转成int int num = Integer.parseInt(value); // 把Redis中的num值+1 this.stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); } } ``` **说明**:重启`spzx-product`服务,通过reids客户端设置num=0 #### 2.1.2 使用ab工具测试 ##### 1 安装ab工具 在linux 系统中安装 **ab 压力测试工具** httpd-tools ```shell yum install -y httpd-tools ``` ##### 2 网关配置 配置本地锁测试接口为白名单 ```yaml # 不校验白名单 ignore: whites: ...省略 - /product/test/** ``` ##### 3 测试 **注意:测试前将Windows防火墙关闭!** 命令语法: ```shell ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 ``` 测试如下:5000请求,100并发 ```shell ab -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock ``` 查看Redis中的值: ![img](images/wps4-17317069436721.jpg) * 也可以使用jmeter工具压力测试:并发100 ![image-20240714195729578](images/image-20240714195729578-17314274287895.png) #### 2.1.3 使用本地锁 添加`synchronized`关键字 ```java @Override public synchronized void testLock() { } ``` 使用ab工具压力测试:5000次请求,并发100 查看Redis中的结果: ![img](images/wps6-17317071434152.jpg) 与预期一致。是否真的完美?接下来再看集群情况下,会怎样? #### 2.1.4 分布式环境问题演示 接下来启动`spzx-product`的9205、9215、9225三个运行实例: ##### 1 拷贝两个配置 ##### 2 分别添运行参数 ``` -Dserver.port=9215 -Dserver.port=9225 ``` ##### 3、启动三个实例 ##### 4、通过网关压力测试本地锁 ```shell ab -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock ``` 查看Redis中的值: ![img](images/wps9-17317071434167.jpg) 集群情况下又出问题了。 以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里存在局限性。**此时需要分布式锁。** ### 2.2 分布式锁解决方案 要解决的问题:单机部署情况下的并发控制锁策略失效,需要一种跨JVM的机制来控制共享资源的访问。 主流的实现方案: 1. 基于数据库实现分布式锁 2. 基于缓存( Redis等) - 性能最高 3. 基于Zookeeper - 可靠性最高 ### 2.3 使用Redis实现分布式锁 img 1. 多个客户端同时获取锁(setnx) 2. 获取成功,执行业务逻辑,执行完成释放锁(del) 3. 其他客户端等待重试 `SETNX` 是 Redis 中的一个命令,用于设置键值对,但只有在键不存在的情况下才会执行成功。 `SETNX` 是 “SET if Not eXists” 的简称。它主要用于实现分布式锁和避免键值覆盖。 使用方法: ``` SETNX key value ``` - **key**:你想要设置的键。 - **value**:你想要设置的值。 返回值: - **1**:如果键不存在并且设置了值。 - **0**:如果键已经存在,设置操作不会执行。 示例: 假设我们有一个键 `mykey`: ``` 127.0.0.1:6379> SETNX mykey "Hello" (integer) 1 127.0.0.1:6379> SETNX mykey "World" (integer) 0 127.0.0.1:6379> GET mykey "Hello" ``` #### 2.3.1 分布式锁初版 ```java /** * 采用SpringDataRedis实现分布式锁 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key) */ @Override public void testLock() { //0.先尝试获取锁 setnx key val Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock"); //stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //System.out.println("value = " + value); //System.out.println("出现异常"); //int a = 9/0; /*try { Thread.sleep(7000); } catch (InterruptedException e) { throw new RuntimeException(e); }*/ //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 stringRedisTemplate.delete("lock"); }else{ //获取锁失败,等待1秒后重试 try { Thread.sleep(1000); System.out.println("重试"); this.testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 重启,服务集群,通过网关压力测试: ```http ab -n 5000 -c 100 http://ip:8080/product/test/testLock ``` 查看Redis中的值: ![img](images/wps12-173142742878912.jpg) **死锁问题:**如果有一个客户端在持有锁期间,业务逻辑出现异常,导致锁无法释放,会造成后续其他客户端一直等待。 **注意:每次测试后在redis中删除lock锁** ```java int serverPort = request.getServerPort(); ``` ```java if(serverPort == 9205){ int a = 9/0; } ``` **解决方案1:**:添加try catch finally ```java try{ String value = stringRedisTemplate.opsForValue().get("num"); if(serverPort == 9205){ int a = 9/0; } if (StringUtils.isBlank(value)) { return; } int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); } catch(Exception e){ System.out.println("异常捕获"); }finally{ //4.将锁释放 System.out.println("释放锁"); stringRedisTemplate.delete("lock"); } ``` 如果遇到极端情况,服务器宕机,则无法执行finally,因此可以结合使用方案2 **解决方案2:**设置过期时间,自动释放锁。 #### 2.3.2 优化之设置锁的过期时间 设置过期时间有两种方式: img 1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放) 设置过期时间: ```java stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS); ``` 2. 在set时指定过期时间(推荐) 设置过期时间:**(注意:删除上一步的finally)** ```java //0.先尝试获取锁:set命令代替setnx命令 //SET key value NX EX 3 //以上命令会在键不存在时设置键值,并同时设置一个3秒的过期时间,所有操作都在一个原子操作中完成。 Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS); ``` 压力测试肯定也没有问题。**(注意:测试前删掉上一步中redis中没有释放的锁)** **问题:**可能会释放其他服务器的锁。 **场景:**如果业务逻辑的执行时间是7s。执行流程如下 1. index1业务逻辑没执行完,3秒后锁被自动释放。 2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。 3. index3获取到锁,执行业务逻辑,1秒后index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。 最终等于没锁的情况。**(注意:测试时删掉发生异常的语句)** **结论:** - 过期设置的时间过短,锁在业务没有执行完之前提前释放,相当于没有加锁 - 过期时间设置的过长,执行效率低 - 可能会释放其他线程的锁 **解决:** - setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁 - 锁的过期时间设置的稍微长一点,防止程序没有执行完成,锁就被释放 #### 2.3.3 优化之UUID防误删 img ```java @Override public void testLock() throws InterruptedException { //0.先尝试获取锁 setnx key val String uuid = UUID.randomUUID().toString(); //过期时间设置的长一点 Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) { stringRedisTemplate.delete("lock"); } }else{ //获取锁成功,执行业务代码 Thread.sleep(10); System.out.println("重试"); this.testLock(); } } ``` **问题:**删除操作缺乏原子性。 场景: 1、index1执行删除时,查询到的lock值确实和uuid相等 ```java if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) ``` 2、index1执行删除前,lock刚好过期时间已到,被Redis自动释放,在Redis中没有了锁。 ```java stringRedisTemplate.delete("lock"); ``` 3、index2获取了lock,index2线程获取到了cpu的资源,开始执行方法 4、index1执行删除,此时会把index2的lock删除: ```java stringRedisTemplate.delete("lock"); ``` **测试代码:调整各部分的时间** ```java @Override public void testLock() throws InterruptedException { //0.先尝试获取锁 setnx key val String uuid = UUID.randomUUID().toString(); //过期时间设置的长一点 Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.MILLISECONDS); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); Thread.sleep(20); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) { Thread.sleep(20); stringRedisTemplate.delete("lock"); } }else{ //获取锁成功,执行业务代码 Thread.sleep(10); System.out.println("重试"); this.testLock(); } } ``` #### 2.3.4 优化之LUA脚本保证删除的原子性 ##### 1 LUA脚本基本语法 redis内置了LUA脚本解释器,redis客户端中通过EVAL可以执行LUA脚本 ```lua -- eval script numkeys key [key ...] arg [arg ...] -- 1、返回执行结果 最后的0代表没有参数 EVAL "return 'hello!'" 0 -- 2、参数的使用 2代表传入的keys(通过KEYS[i]使用)有两个,其余的参数通过ARGV[i]获取使用 ..拼接字符串 EVAL "return KEYS[1]..','..KEYS[2]..','..ARGV[1]..','..ARGV[2]" 2 key1 key2 argv1 argv2 -- 3、局部变量的使用 local用来定义局部变量 EVAL "local a=KEYS[1] return a" 1 hehe -- 4、条件判断 语句结束需要使用end, ~=不等于 ==等于 EVAL "local age=tonumber(KEYS[1]) if(age>=18) then return 'laoniao' elseif(age<18 and age>14) then return 'cainiao' else return 'pangju' end" 1 16 -- 5、LUA脚本执行redis命令 EVAL "return redis.call('set' , KEYS[1] , ARGV[1])" 1 a b EVAL "return redis.call('set' , KEYS[1] , KEYS[2] , KEYS[3] , KEYS[4])" 4 a b ex 60 ``` 释放锁的LUA脚本: ```lua if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end ``` redis java客户端使用LUA脚本: ```java //通过execute可以执行LUA脚本,参数1:脚本字符串,参数2:脚本返回值类型,参数3:keys列表,参数4:argv列表 stringRedisTemplate.execute( new DefaultRedisScript<>(script , Boolean.class), list, args... ) ``` ##### 2 使用LUA脚本优化分布式锁 **注意:** - 调整过期时间足够长,确保业务能够执行完成 - 修改释放锁的方式,使用lua ```java @Override public void testLock() throws InterruptedException { //0.先尝试获取锁 setnx key val String uuid = UUID.randomUUID().toString(); //过期时间设置的长一点 Boolean flag = stringRedisTemplate .opsForValue() .setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); Thread.sleep(20); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 //锁的释放 System.out.println("锁的释放"); String script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """; Boolean lock = stringRedisTemplate.execute( new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid); System.out.println("result = " + lock); }else{ //获取锁成功,执行业务代码 Thread.sleep(10); System.out.println("重试"); this.testLock(); } } ``` ### 2.4 分布式锁总结 为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下几个条件: - 多进程可见:否则就无法实现分布式效果。 - 互斥性:同一时刻,只能有一个进程获得锁。 - 避免死锁:出现异常情况,保证锁可以被释放。 - 解铃还须系铃人:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。 - 原子性:加锁和解锁必须具有原子性。 - 高可用:避免锁服务宕机或处理好宕机的补救措施(redis集群架构) ### 2.5 分布式锁改造-获取商品详情 ChannelController ```java //将 ProductSku productSku = productSkuService.getById(skuId); //修改成 ProductSku productSku = productSkuService.getProductSku(skuId); ``` ProductSkuServiceImpl ```java @Autowired private RedisTemplate redisTemplate; /* * 根据SkuID查询SKU商品信息 * * @param skuId * @return */ @Override public ProductSku getProductSku(Long skuId) { try { //1.优先从缓存中获取数据 //1.1 构建业务数据Key 形式:前缀+业务唯一标识 String dataKey = "product:sku:" + skuId; //1.2 查询Redis获取业务数据 ProductSku productSku = (ProductSku) redisTemplate.opsForValue().get(dataKey); //1.3 命中缓存则直接返回 if (productSku != null) { log.info("命中缓存,直接返回,线程ID:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName()); return productSku; } //2.尝试获取分布式锁(set k v nx ex 可能获取锁失败) //2.1 构建锁key String lockKey = "product:sku:lock:" + skuId; //2.2 采用UUID作为线程标识 String lockVal = UUID.randomUUID().toString().replaceAll("-", ""); //2.3 利用Redis提供set nx ex 获取分布式锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockVal, 5, TimeUnit.SECONDS); if (flag) { //3.获取锁成功执行业务,将查询业务数据放入缓存Redis log.info("获取锁成功:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName()); try { //再次检查缓存,是否有数据 productSku = (ProductSku)redisTemplate.opsForValue().get(dataKey); if(productSku != null){ return productSku; } productSku = baseMapper.selectById(skuId); //防止缓存穿透,将null值转换成新对象存入数据库,并且设置较短的过期时间 long ttl = productSku == null ? 1 * 60 : 10 * 60; if(productSku == null){ productSku = new ProductSku(); } //将查询数据库结果放入缓存 redisTemplate.opsForValue().set(dataKey, productSku, ttl, TimeUnit.SECONDS); return productSku; } finally { //4.业务执行完毕释放锁 String scriptText = """ if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end """; DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(scriptText); redisScript.setResultType(Long.class); redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockVal); } } else { try { //5.获取锁失败则自旋(业务要求必须执行) Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } log.error("获取锁失败,自旋:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName()); return this.getProductSku(skuId); } } catch (Exception e) { //兜底处理方案:Redis服务有问题,将业务数据获取自动从数据库获取 log.error("[商品服务]查询商品信息异常:{}", e); return baseMapper.selectById(skuId); } } ``` ## 3 Redis的Bitmap解决缓存穿透 ### 3.1 概述 Redis 的 Bitmap(位图)是一种特殊的字符串数据类型,它利用字符串类型键(key)来存储一系列连续的二进制位(bits),每个位可以独立地表示一个布尔值(0 或 1)。这种数据结构非常适合用于存储和操作大量二进状态的数据,尤其在需要高效空间利用率和特定位操作场景中表现出色。 ### 3.2 常见命令 - `setbit key offset value`:设置或清除指定偏移量上的位(bit)。`offset` 是从0开始的位索引,`value` 可以为 0 或 1。 - `getbit key offset`:返回指定偏移量上的位值。 ### 3.3 Bitmap的使用 #### 3.3.1 初始化数据 在`spzx-product` 模块的启动类中`实现CommandLineRunner接口` ```java @EnableCustomConfig @EnableRyFeignClients @SpringBootApplication public class SpzxProductApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication.run(SpzxProductApplication.class,args); } @Autowired private RedisTemplate redisTemplate; @Autowired private ProductSkuMapper productSkuMapper; //Springboot应用初始化后会执行一次该方法 @Override public void run(String... args) throws Exception { String key = "product:sku:data"; //查询mysql里面商品skuId List productSkuList = productSkuMapper.selectList(null); productSkuList.forEach(item -> { //添加到redis redisTemplate.opsForValue().setBit(key,item.getId(),true); }); } } ``` #### 3.3.2 sku详情页整合Bitmap ChannelController的item方法中,首先从Bitmap中获取数据 ```java @Autowired private RedisTemplate redisTemplate; @Operation(summary = "商品详情") @GetMapping("/item/{skuId}") public AjaxResult item( @Parameter(description = "skuId") @PathVariable Long skuId) { String key = "product:sku:data"; Boolean flag = redisTemplate.opsForValue().getBit(key, skuId); if (!flag) { log.error("用户查询商品sku不存在:{}", skuId); //查询数据不存在直接返回空对象 throw new ServiceException("用户查询商品sku不存在"); } ... return success(itemVo); } ``` #### 3.3.3 商品上下架整合Bitmap 商品上架时将数据添加到Bitmap中 ProductServiceImpl.updateStatus方法 ```java @Autowired private RedisTemplate redisTemplate; @Transactional(rollbackFor = Exception.class) @Override public void updateStatus(Long id, Integer status) { Product product = new Product(); product.setId(id); product.setStatus(status); baseMapper.updateById(product); //更新sku的status productSkuMapper.update( null, new LambdaUpdateWrapper() .eq(ProductSku::getProductId, id) .set(ProductSku::getStatus, status)); String key = "product:sku:data"; List productSkuList = productSkuMapper.selectList( new LambdaQueryWrapper().eq(ProductSku::getProductId, id)); if (status == 1) { // 商品上架时,将 sku ID 列表放入 bitmap productSkuList.forEach(item -> { redisTemplate.opsForValue().setBit(key, item.getId(), true); }); } else if (status == -1) { // 商品下架时,将 sku ID 列表从 bitmap 中移除 productSkuList.forEach(item -> { redisTemplate.opsForValue().setBit(key, item.getId(), false); }); } } ``` ## 4 异步编排 ### 4.1 问题分析 问题:查询商品详情页的逻辑非常复杂,数据的获取都需要业务调用,必然需要花费更多的时间。 假如商品详情页的每个查询,需要如下标注的时间才能完成 - 获取sku的基本信息 1s - 获取商品信息 1.5s - 商品最新价格 0.5s - ...... 那么,用户需要3s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这些操作,就会更快得到响应。 `CompletableFuture`可以使原本串行执行的代码,变为并行执行,提高代码执行速度。 ### 4.2 优化商品详情页 #### 4.2.1 ThreadPoolConfig 全局自定义线程池配置 ```java package com.spzx.product.configure; @Configuration public class ThreadPoolConfig { @Bean public ThreadPoolExecutor threadPoolExecutor() { int processorsCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = processorsCount; // 设为处理器数量 int maximumPoolSize = processorsCount * 2; // 最大线程数设为核心线程的两倍 long keepAliveTime = 60L; // 保持活跃时间为 60 秒 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), // 使用有界队列控制任务数量 Executors.defaultThreadFactory(), //new ThreadPoolExecutor.CallerRunsPolicy() //自定义拒绝策略 (runnable, executor) -> { try { Thread.sleep(200); } catch (InterruptedException e) { } //再次将拒绝任务提交给线程池执行 executor.submit(runnable); } ); //线程会在需要时创建 return threadPoolExecutor; } } ``` - 核心线程数设为 `processorsCount`,即每个处理器一个线程。 - 最大线程数设为 `processorsCount * 2`,允许在高峰负载时增加线程数。 - `keepAliveTime` 设为 60 秒,在负载高峰过后可以减少活动线程数,减少资源占用。 - 使用 `ArrayBlockingQueue` 作为任务队列,并设置一个合适的尺寸,可以控制队列中最多只能存放多少个等待执行的任务。如果所有线程都在忙碌并且队列达到容量,则会根据拒绝策略处理新增任务。 #### 4.2.2 ItemServiceImpl ```java @Autowired private ThreadPoolExecutor threadPoolExecutor; @Operation(summary = "商品详情") @GetMapping("/item/{skuId}") public AjaxResult item( @Parameter(description = "skuId") @PathVariable Long skuId) { //0、Bitmap ...... ItemVo itemVo = new ItemVo(); //有返回值的异步方法 CompletableFuture futureProductSku = CompletableFuture.supplyAsync(() -> { //获取sku信息 ProductSku productSku = productSkuService.getById(skuId); itemVo.setProductSku(productSku); return productSku; }, threadPoolExecutor); //异步执行,并且获取商品sku方法的返回值,本身没有返回值 CompletableFuture futureProduct = futureProductSku.thenAcceptAsync(productSku -> { //获取商品id Long productId = productSku.getProductId(); //获取商品信息 Product product = productService.getById(productId); //商品信息:轮播图列表 String[] sliderUrlList = product.getSliderUrls().split(","); itemVo.setSliderUrlList(sliderUrlList); //商品信息:商品规格列表 JSONArray specValueList = JSON.parseArray(product.getSpecValue()); itemVo.setSpecValueList(specValueList); }); //异步执行,不需要获取上一个方法的返回值,本身也没有返回值 CompletableFuture futureSkuPrice = CompletableFuture.runAsync(() -> { //获取商品最新价格 SkuPriceVo skuPrice = productSkuService.getSkuPrice(skuId); itemVo.setSkuPrice(skuPrice); }, threadPoolExecutor); //异步执行,并且获取商品sku方法的返回值,本身没有返回值 CompletableFuture futureProductDetails = futureProductSku.thenAcceptAsync(productSku->{ //获取商品id Long productId = productSku.getProductId(); //获取商品详情图片 String[] detailsImageUrlList = productDetailsService.getProductDetails(productId); itemVo.setDetailsImageUrlList(detailsImageUrlList); }); //异步执行,并且获取商品sku方法的返回值,本身没有返回值 CompletableFuture futureSkuSpecValue = futureProductSku.thenAcceptAsync(productSku -> { //获取商品id Long productId = productSku.getProductId(); //获取商品规格Map Map skuSpecValueMap = productSkuService.getSkuSpecValue(productId); itemVo.setSkuSpecValueMap(skuSpecValueMap); }); //异步执行,不需要获取上一个方法的返回值,本身也没有返回值 CompletableFuture futureSkuStock = CompletableFuture.runAsync(() -> { //获取商品库存信息 SkuStockVo skuStock = skuStockService.getSkuStock(skuId); itemVo.setSkuStock(skuStock); }, threadPoolExecutor); //等待所有任务都完成 CompletableFuture.allOf( futureProductSku, futureProduct, futureProductDetails, futureSkuPrice, futureSkuSpecValue, futureSkuStock) .join();//这是一个阻塞操作,程序会在此等待直到所有任务都完成。 //返回商品详情 return success(itemVo); } ```