尚硅谷_尚品甄选_第4章_商品详情优化.md 31 KB

[TOC]

商品详情优化

1 优化分析

1.1 思路

详情页面是被用户高频访问的,所以性能需要优化。一个系统最大的性能瓶颈,就是数据库的IO操作。从数据库入手也是调优性价比最高的切入点。

一般分为两个层面:

  • 一是对数据库本身做性能优化(参考MySQL高级课程)
  • 二是尽量避免直接查询数据库。避免直接查询数据库的解决办法就是:缓存

1.2 缓存常见问题

缓存最常见的4个问题:

  1. 缓存穿透
  2. 缓存雪崩
  3. 缓存击穿
  4. 缓存一致性问题:延迟双删

1.2.1 缓存穿透

查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。流量大时,可能DB就挂掉了。如果利用不存在的key频繁攻击应用,这就是程序漏洞。

  • 解决方案1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透(例如利用skuid=uuid生成大量的随机id进行访问,这些id很可能大部分都不存在)。

  • 解决方案2 :使用Redis的Bitmap或者布隆过滤器解决随机穿透问题 。

1.2.2 缓存雪崩

设置的缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重,导致雪崩。

  • 解决方案1:在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,避免引发集体失效的事件。

  • 解决方案2:可以采用集群部署方式,防止单节点宕机。

1.2.3 缓存击穿

一个设置了过期时间的key,可能会在某些时间点被超高并发地访问(热点数据),如果这个key的缓存数据在大量请求同时进来时恰好失效,则所有对这个key的数据查询都会落到DB,称为缓存击穿。

  • 解决方案:锁

img

总结:

  • 穿透:频繁查询不存在的数据
  • 雪崩:很多key集体失效

  • 击穿:一个热点key失效,并同时被高并发访问

2 分布式锁

2.1 本地锁的局限性

之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题

2.1.1 编写测试代码

spzx-product中新建TestController中添加测试方法

package com.spzx.product.controller;

@Tag(name = "测试接口")
@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private TestService testService;

    @GetMapping("/testLock")
    public AjaxResult testLock() {
        testService.testLock();
        return AjaxResult.success();
    }
}

业务接口

package com.spzx.product.service;

public interface TestService {

   void testLock();

}

业务实现类

package com.spzx.product.service.impl;
@Service
public class TestServiceImpl implements TestService {

   @Autowired
   private StringRedisTemplate stringRedisTemplate;
    
   @Override
   public void testLock() {
      // 查询Redis中的num值
      String value = this.stringRedisTemplate.opsForValue().get("num");
      // 没有该值return
      if (StringUtils.isBlank(value)){
         return ;
      }
      // 有值就转成int
      int num = Integer.parseInt(value);
      // 把Redis中的num值+1
      this.stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
   }
}

说明:重启spzx-product服务,通过reids客户端设置num=0

2.1.2 使用ab工具测试

1 安装ab工具

在linux 系统中安装 ab 压力测试工具 httpd-tools

yum install -y httpd-tools
2 网关配置

配置本地锁测试接口为白名单

  # 不校验白名单
  ignore:
    whites:
     ...省略
      - /product/test/**
3 测试

注意:测试前将Windows防火墙关闭!

命令语法:

ab  -n(一次发送的请求数)  -c(请求的并发数) 访问路径  

测试如下:5000请求,100并发

ab  -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock 

查看Redis中的值:

img

  • 也可以使用jmeter工具压力测试:并发100

image-20240714195729578

2.1.3 使用本地锁

添加synchronized关键字

@Override
public synchronized void testLock() {
  
}

使用ab工具压力测试:5000次请求,并发100

查看Redis中的结果:

img

与预期一致。是否真的完美?接下来再看集群情况下,会怎样?

2.1.4 分布式环境问题演示

接下来启动spzx-product的9205、9215、9225三个运行实例:

1 拷贝两个配置
2 分别添运行参数
-Dserver.port=9215
-Dserver.port=9225
3、启动三个实例
4、通过网关压力测试本地锁
ab  -n 5000 -c 100 http://网关服务所在ip:8080/product/test/testLock 

查看Redis中的值:

img

集群情况下又出问题了。

以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里存在局限性。此时需要分布式锁。

2.2 分布式锁解决方案

要解决的问题:单机部署情况下的并发控制锁策略失效,需要一种跨JVM的机制来控制共享资源的访问。

主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存( Redis等) - 性能最高
  3. 基于Zookeeper - 可靠性最高

2.3 使用Redis实现分布式锁

img

  1. 多个客户端同时获取锁(setnx)
  2. 获取成功,执行业务逻辑,执行完成释放锁(del)
  3. 其他客户端等待重试

SETNX 是 Redis 中的一个命令,用于设置键值对,但只有在键不存在的情况下才会执行成功。

SETNX 是 “SET if Not eXists” 的简称。它主要用于实现分布式锁和避免键值覆盖。

使用方法:

SETNX key value
  • key:你想要设置的键。
  • value:你想要设置的值。

返回值:

  • 1:如果键不存在并且设置了值。
  • 0:如果键已经存在,设置操作不会执行。

示例:

假设我们有一个键 mykey

127.0.0.1:6379> SETNX mykey "Hello"
(integer) 1

127.0.0.1:6379> SETNX mykey "World"
(integer) 0

127.0.0.1:6379> GET mykey
"Hello"

2.3.1 分布式锁初版

/**
 * 采用SpringDataRedis实现分布式锁
 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
 */
@Override
public void testLock() {

    //0.先尝试获取锁 setnx key val
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
    //stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS);
    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");
        
        //System.out.println("value = " + value);
        //System.out.println("出现异常");
        //int a = 9/0;
        /*try {
        	Thread.sleep(7000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }*/
        
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放
        stringRedisTemplate.delete("lock");

    }else{
        //获取锁失败,等待1秒后重试
        try {
            Thread.sleep(1000);
            System.out.println("重试");
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

重启,服务集群,通过网关压力测试:

ab  -n 5000 -c 100 http://ip:8080/product/test/testLock 

查看Redis中的值:

img

死锁问题:如果有一个客户端在持有锁期间,业务逻辑出现异常,导致锁无法释放,会造成后续其他客户端一直等待。

注意:每次测试后在redis中删除lock锁

int serverPort = request.getServerPort();
if(serverPort == 9205){
    int a = 9/0;
}

解决方案1::添加try catch finally

try{
    String value = stringRedisTemplate.opsForValue().get("num");
    
    if(serverPort == 9205){
        int a = 9/0;
    }
    
    if (StringUtils.isBlank(value)) {
        return;
    }
    int num = Integer.parseInt(value);
    stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

} catch(Exception e){
    System.out.println("异常捕获");
}finally{
    //4.将锁释放
    System.out.println("释放锁");
    stringRedisTemplate.delete("lock");
}

如果遇到极端情况,服务器宕机,则无法执行finally,因此可以结合使用方案2

解决方案2:设置过期时间,自动释放锁。

2.3.2 优化之设置锁的过期时间

设置过期时间有两种方式:

img

  1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

设置过期时间:

   stringRedisTemplate.expire("lock", 3, TimeUnit.SECONDS);
  1. 在set时指定过期时间(推荐)

设置过期时间:(注意:删除上一步的finally)

   //0.先尝试获取锁:set命令代替setnx命令
   //SET key value NX EX 3
   //以上命令会在键不存在时设置键值,并同时设置一个3秒的过期时间,所有操作都在一个原子操作中完成。
   Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS); 

压力测试肯定也没有问题。(注意:测试前删掉上一步中redis中没有释放的锁)

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑,1秒后index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。(注意:测试时删掉发生异常的语句)

结论:

  • 过期设置的时间过短,锁在业务没有执行完之前提前释放,相当于没有加锁
  • 过期时间设置的过长,执行效率低
  • 可能会释放其他线程的锁

解决:

  • setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
  • 锁的过期时间设置的稍微长一点,防止程序没有执行完成,锁就被释放

2.3.3 优化之UUID防误删

img

@Override
public void testLock() throws InterruptedException {
    //0.先尝试获取锁 setnx key val
    String uuid = UUID.randomUUID().toString();
    //过期时间设置的长一点
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");

        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放
        if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) {
            stringRedisTemplate.delete("lock");
        }
    }else{
        //获取锁成功,执行业务代码
        Thread.sleep(10);
        System.out.println("重试");
        this.testLock();
    }
}

问题:删除操作缺乏原子性。

场景:

1、index1执行删除时,查询到的lock值确实和uuid相等

if(uuid.equals(stringRedisTemplate.opsForValue().get("lock")))

2、index1执行删除前,lock刚好过期时间已到,被Redis自动释放,在Redis中没有了锁。

stringRedisTemplate.delete("lock");

3、index2获取了lock,index2线程获取到了cpu的资源,开始执行方法

4、index1执行删除,此时会把index2的lock删除:

stringRedisTemplate.delete("lock");

测试代码:调整各部分的时间

@Override
public void testLock() throws InterruptedException {
    //0.先尝试获取锁 setnx key val
    String uuid = UUID.randomUUID().toString();
    //过期时间设置的长一点
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.MILLISECONDS);
    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");

        Thread.sleep(20);

        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放
        if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))) {
            Thread.sleep(20);
            stringRedisTemplate.delete("lock");
        }
    }else{
        //获取锁成功,执行业务代码
        Thread.sleep(10);
        System.out.println("重试");
        this.testLock();
    }
}

2.3.4 优化之LUA脚本保证删除的原子性

1 LUA脚本基本语法

redis内置了LUA脚本解释器,redis客户端中通过EVAL可以执行LUA脚本

-- eval script numkeys key [key ...] arg [arg ...]
-- 1、返回执行结果  最后的0代表没有参数
EVAL "return 'hello!'" 0
-- 2、参数的使用 2代表传入的keys(通过KEYS[i]使用)有两个,其余的参数通过ARGV[i]获取使用 ..拼接字符串
EVAL "return KEYS[1]..','..KEYS[2]..','..ARGV[1]..','..ARGV[2]" 2 key1 key2 argv1 argv2
-- 3、局部变量的使用 local用来定义局部变量
EVAL "local a=KEYS[1] return a" 1 hehe
-- 4、条件判断  语句结束需要使用end, ~=不等于  ==等于
EVAL "local age=tonumber(KEYS[1]) if(age>=18) then return 'laoniao' elseif(age<18 and age>14) then return 'cainiao' else return 'pangju' end" 1 16
-- 5、LUA脚本执行redis命令
EVAL "return redis.call('set' , KEYS[1] , ARGV[1])" 1 a b
EVAL "return redis.call('set' , KEYS[1] , KEYS[2] , KEYS[3] , KEYS[4])" 4 a b ex 60

释放锁的LUA脚本:

if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end

redis java客户端使用LUA脚本:

//通过execute可以执行LUA脚本,参数1:脚本字符串,参数2:脚本返回值类型,参数3:keys列表,参数4:argv列表
stringRedisTemplate.execute(
    new DefaultRedisScript<>(script , Boolean.class),
    list,
    args...
)
2 使用LUA脚本优化分布式锁

注意:

  • 调整过期时间足够长,确保业务能够执行完成
  • 修改释放锁的方式,使用lua

    @Override
    public void testLock() throws InterruptedException {
    //0.先尝试获取锁 setnx key val
    String uuid = UUID.randomUUID().toString();
    //过期时间设置的长一点
    Boolean flag = stringRedisTemplate
        .opsForValue()
        .setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    
    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");
    
        Thread.sleep(20);
    
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
    
        //4.将锁释放
        //锁的释放
        System.out.println("锁的释放");
        String script = """
                    if redis.call('get', KEYS[1]) == ARGV[1]
                        then return redis.call('del', KEYS[1])
                    else
                        return 0
                    end
                    """;
        Boolean lock = stringRedisTemplate.execute(
                    new DefaultRedisScript<>(script, Boolean.class),
                    Arrays.asList("lock"),
                    uuid);
        System.out.println("result = " + lock);
    }else{
        //获取锁成功,执行业务代码
        Thread.sleep(10);
        System.out.println("重试");
        this.testLock();
    }
    }
    

2.4 分布式锁总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下几个条件:

  • 多进程可见:否则就无法实现分布式效果。

  • 互斥性:同一时刻,只能有一个进程获得锁。

  • 避免死锁:出现异常情况,保证锁可以被释放。

  • 解铃还须系铃人:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

  • 原子性:加锁和解锁必须具有原子性。

  • 高可用:避免锁服务宕机或处理好宕机的补救措施(redis集群架构)

2.5 分布式锁改造-获取商品详情

ChannelController

//将
ProductSku productSku = productSkuService.getById(skuId);
//修改成
ProductSku productSku = productSkuService.getProductSku(skuId);

ProductSkuServiceImpl

@Autowired
private RedisTemplate redisTemplate;

/*
     * 根据SkuID查询SKU商品信息
     *
     * @param skuId
     * @return
     */
@Override
public ProductSku getProductSku(Long skuId) {
    try {
        //1.优先从缓存中获取数据
        //1.1 构建业务数据Key 形式:前缀+业务唯一标识
        String dataKey = "product:sku:" + skuId;
        //1.2 查询Redis获取业务数据
        ProductSku productSku = (ProductSku) redisTemplate.opsForValue().get(dataKey);
        //1.3 命中缓存则直接返回
        if (productSku != null) {
            log.info("命中缓存,直接返回,线程ID:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());
            return productSku;
        }
        
        //2.尝试获取分布式锁(set k v nx ex 可能获取锁失败)
        //2.1 构建锁key
        String lockKey = "product:sku:lock:" + skuId;
        //2.2 采用UUID作为线程标识
        String lockVal = UUID.randomUUID().toString().replaceAll("-", "");
        //2.3 利用Redis提供set nx ex 获取分布式锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockVal, 5, TimeUnit.SECONDS);
        if (flag) {
            //3.获取锁成功执行业务,将查询业务数据放入缓存Redis
            log.info("获取锁成功:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());

            try {
                //再次检查缓存,是否有数据
                productSku = (ProductSku)redisTemplate.opsForValue().get(dataKey);
                if(productSku != null){
                    return productSku;
                }

                productSku = baseMapper.selectById(skuId);
                //防止缓存穿透,将null值转换成新对象存入数据库,并且设置较短的过期时间
                long ttl = productSku == null ? 1 * 60 : 10 * 60;
                if(productSku == null){
                    productSku = new ProductSku();
                }
                //将查询数据库结果放入缓存
                redisTemplate.opsForValue().set(dataKey, productSku, ttl, TimeUnit.SECONDS);
                return productSku;
            } finally {
                //4.业务执行完毕释放锁
                String scriptText = """
                    if redis.call('get',KEYS[1]) == ARGV[1]
                    then
                        return redis.call('del',KEYS[1])
                    else
                        return 0
                    end
                    """;
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(scriptText);
                redisScript.setResultType(Long.class);
                redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockVal);
            }
        } else {
            try {
                //5.获取锁失败则自旋(业务要求必须执行)
                Thread.sleep(200);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.error("获取锁失败,自旋:{},线程名称:{}", Thread.currentThread().getId(), Thread.currentThread().getName());
            return this.getProductSku(skuId);
        }
    } catch (Exception e) {
        //兜底处理方案:Redis服务有问题,将业务数据获取自动从数据库获取
        log.error("[商品服务]查询商品信息异常:{}", e);
        return baseMapper.selectById(skuId);
    }
}

3 Redis的Bitmap解决缓存穿透

3.1 概述

Redis 的 Bitmap(位图)是一种特殊的字符串数据类型,它利用字符串类型键(key)来存储一系列连续的二进制位(bits),每个位可以独立地表示一个布尔值(0 或 1)。这种数据结构非常适合用于存储和操作大量二进状态的数据,尤其在需要高效空间利用率和特定位操作场景中表现出色。

3.2 常见命令

  • setbit key offset value:设置或清除指定偏移量上的位(bit)。offset 是从0开始的位索引,value 可以为 0 或 1。
  • getbit key offset:返回指定偏移量上的位值。

3.3 Bitmap的使用

3.3.1 初始化数据

spzx-product 模块的启动类中实现CommandLineRunner接口

@EnableCustomConfig
@EnableRyFeignClients
@SpringBootApplication
public class SpzxProductApplication implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication.run(SpzxProductApplication.class,args);
    }

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private ProductSkuMapper productSkuMapper;

    //Springboot应用初始化后会执行一次该方法
    @Override
    public void run(String... args) throws Exception {
        String key = "product:sku:data";
        //查询mysql里面商品skuId
        List<ProductSku> productSkuList = productSkuMapper.selectList(null);
        productSkuList.forEach(item -> {
            //添加到redis
            redisTemplate.opsForValue().setBit(key,item.getId(),true);
        });
    }
}

3.3.2 sku详情页整合Bitmap

ChannelController的item方法中,首先从Bitmap中获取数据

@Autowired
private RedisTemplate redisTemplate;

@Operation(summary = "商品详情")
@GetMapping("/item/{skuId}")
public AjaxResult item(
    @Parameter(description = "skuId")
    @PathVariable Long skuId) {

	String key = "product:sku:data";
	Boolean flag = redisTemplate.opsForValue().getBit(key, skuId);
	if (!flag) {
		log.error("用户查询商品sku不存在:{}", skuId);
		//查询数据不存在直接返回空对象
		throw new ServiceException("用户查询商品sku不存在");
	}
    
    ...
        
    return success(itemVo);
}

3.3.3 商品上下架整合Bitmap

商品上架时将数据添加到Bitmap中

ProductServiceImpl.updateStatus方法

@Autowired
private RedisTemplate redisTemplate;

@Transactional(rollbackFor = Exception.class)
@Override
public void updateStatus(Long id, Integer status) {
    
    Product product = new Product();
    product.setId(id);
    product.setStatus(status);
    baseMapper.updateById(product);
    
    //更新sku的status
    productSkuMapper.update(
        null,
        new LambdaUpdateWrapper<ProductSku>()
        .eq(ProductSku::getProductId, id)
        .set(ProductSku::getStatus, status));
    
    String key = "product:sku:data";
    List<ProductSku> productSkuList = productSkuMapper.selectList(
        new LambdaQueryWrapper<ProductSku>().eq(ProductSku::getProductId, id));

    if (status == 1) {
        // 商品上架时,将 sku ID 列表放入 bitmap
        productSkuList.forEach(item -> {
            redisTemplate.opsForValue().setBit(key, item.getId(), true);
        });
    } else if (status == -1) {
        // 商品下架时,将 sku ID 列表从 bitmap 中移除
        productSkuList.forEach(item -> {
            redisTemplate.opsForValue().setBit(key, item.getId(), false);
        });
    }
}

4 异步编排

4.1 问题分析

问题:查询商品详情页的逻辑非常复杂,数据的获取都需要业务调用,必然需要花费更多的时间。

假如商品详情页的每个查询,需要如下标注的时间才能完成

  • 获取sku的基本信息 1s
  • 获取商品信息 1.5s
  • 商品最新价格 0.5s
  • ......

那么,用户需要3s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这些操作,就会更快得到响应。

CompletableFuture可以使原本串行执行的代码,变为并行执行,提高代码执行速度。

4.2 优化商品详情页

4.2.1 ThreadPoolConfig

全局自定义线程池配置

package com.spzx.product.configure;

@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        int processorsCount = Runtime.getRuntime().availableProcessors();
        int corePoolSize = processorsCount; // 设为处理器数量
        int maximumPoolSize = processorsCount * 2; // 最大线程数设为核心线程的两倍
        long keepAliveTime = 60L; // 保持活跃时间为 60 秒

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100), // 使用有界队列控制任务数量
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.CallerRunsPolicy()
            	//自定义拒绝策略
                (runnable, executor) -> {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    //再次将拒绝任务提交给线程池执行
                    executor.submit(runnable);
                }
        );

        //线程会在需要时创建
        return threadPoolExecutor;
    }
}
  • 核心线程数设为 processorsCount,即每个处理器一个线程。
  • 最大线程数设为 processorsCount * 2,允许在高峰负载时增加线程数。
  • keepAliveTime 设为 60 秒,在负载高峰过后可以减少活动线程数,减少资源占用。
  • 使用 ArrayBlockingQueue 作为任务队列,并设置一个合适的尺寸,可以控制队列中最多只能存放多少个等待执行的任务。如果所有线程都在忙碌并且队列达到容量,则会根据拒绝策略处理新增任务。

4.2.2 ItemServiceImpl

@Autowired
private ThreadPoolExecutor threadPoolExecutor;

@Operation(summary = "商品详情")
@GetMapping("/item/{skuId}")
public AjaxResult item(
    @Parameter(description = "skuId")
    @PathVariable Long skuId) {
    
    
    //0、Bitmap
    ......

	ItemVo itemVo = new ItemVo();
    //有返回值的异步方法
    CompletableFuture<ProductSku> futureProductSku = CompletableFuture.supplyAsync(() -> {
        //获取sku信息
        ProductSku productSku = productSkuService.getById(skuId);
        itemVo.setProductSku(productSku);
        return productSku;
    }, threadPoolExecutor);

    //异步执行,并且获取商品sku方法的返回值,本身没有返回值
    CompletableFuture<Void> futureProduct = futureProductSku.thenAcceptAsync(productSku -> {

        //获取商品id
        Long productId = productSku.getProductId();

        //获取商品信息
        Product product = productService.getById(productId);

        //商品信息:轮播图列表
        String[] sliderUrlList = product.getSliderUrls().split(",");
        itemVo.setSliderUrlList(sliderUrlList);
        //商品信息:商品规格列表
        JSONArray specValueList = JSON.parseArray(product.getSpecValue());
        itemVo.setSpecValueList(specValueList);
    });

    //异步执行,不需要获取上一个方法的返回值,本身也没有返回值
    CompletableFuture<Void> futureSkuPrice = CompletableFuture.runAsync(() -> {
        //获取商品最新价格
        SkuPriceVo skuPrice = productSkuService.getSkuPrice(skuId);
        itemVo.setSkuPrice(skuPrice);
    }, threadPoolExecutor);


    //异步执行,并且获取商品sku方法的返回值,本身没有返回值
    CompletableFuture<Void> futureProductDetails = futureProductSku.thenAcceptAsync(productSku->{
        //获取商品id
        Long productId = productSku.getProductId();
        //获取商品详情图片
        String[] detailsImageUrlList = productDetailsService.getProductDetails(productId);
        itemVo.setDetailsImageUrlList(detailsImageUrlList);
    });


    //异步执行,并且获取商品sku方法的返回值,本身没有返回值
    CompletableFuture<Void> futureSkuSpecValue = futureProductSku.thenAcceptAsync(productSku -> {
        //获取商品id
        Long productId = productSku.getProductId();
        //获取商品规格Map
        Map<String, Long> skuSpecValueMap = productSkuService.getSkuSpecValue(productId);
        itemVo.setSkuSpecValueMap(skuSpecValueMap);
    });

    //异步执行,不需要获取上一个方法的返回值,本身也没有返回值
    CompletableFuture<Void> futureSkuStock = CompletableFuture.runAsync(() -> {
        //获取商品库存信息
        SkuStockVo skuStock = skuStockService.getSkuStock(skuId);
        itemVo.setSkuStock(skuStock);
    }, threadPoolExecutor);

    //等待所有任务都完成
    CompletableFuture.allOf(
        futureProductSku,
        futureProduct,
        futureProductDetails,
        futureSkuPrice,
        futureSkuSpecValue,
        futureSkuStock)
        .join();//这是一个阻塞操作,程序会在此等待直到所有任务都完成。

    //返回商品详情
    return success(itemVo);
}