第6章 详情优化.md 39 KB

谷粒随享

第6章 详情优化

学习目标:

  • 微服务链路追踪组件应用(基础组件)
  • 针对于相对高并发业务场景进行性能优化
    • 采用分布式缓存Redis
    • 缓存击穿(分布式锁)
    • 缓存穿透(布隆过滤器)
    • 缓存雪崩(集群)
    • 缓存一致性(基于MySQL-BinLog数据最终一致)
    • 采用异步任务+线程池

1、链路追踪Zipkin

​ 一个看起来很简单的应用,背后可能需要数十或数百个服务来支撑,一个请求就要多次服务调用。当请求变慢、或者不能使用时,我们是不知道是哪个后台服务引起的。这时,我们使用 Zipkin 就能解决这个问题。由于业务访问量的增大,业务复杂度增加,以及微服务架构和容器技术的兴起,要对系统进行各种拆分。微服务系统拆分后,我们可以使用 Zipkin 链路,来快速定位追踪有故障的服务点。

Zipkin 是一款开源的分布式实时数据追踪系统(Distributed Tracking System),能够收集服务间调用的时序数据,提供调用链路的追踪。

Zipkin 其主要功能是聚集来自各个异构系统的实时监控数据,在微服务架构下,十分方便地用于服务响应延迟等问题的定位。

Zipkin 每一个调用链路通过一个 trace id 来串联起来,只要你有一个 trace id ,就能够直接定位到这次调用链路,并且可以根据服务名、标签、响应时间等进行查询,过滤那些耗时比较长的链路节点。

Zipkin 分布式跟踪系统就能非常好地解决该问题,主要解决以下3点问题:

    1. 动态展示服务的链路;
    1. 分析服务链路的瓶颈并对其进行调优;
    1. 快速进行服务链路的故障发现。

在优化前通过Zipkin链路追踪进行查看接口耗时

  1. service微服务父工程pom.xml中新增Zipkin相关依赖

    <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-zipkin</artifactId>
       <version>2.2.8.RELEASE</version>
    </dependency>
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
       <groupId>io.micrometer</groupId>
       <artifactId>micrometer-tracing-bridge-brave</artifactId>
    </dependency>
    <dependency>
       <groupId>io.zipkin.reporter2</groupId>
       <artifactId>zipkin-reporter-brave</artifactId>
    </dependency>
    
  2. 在Nacos配置common.yaml中新增配置(已完成)

    spring:
     zipkin:
       base-url: http://192.168.200.6:9411
       discovery-client-enabled: false
       sender:
         type: web
    management:
     zipkin:
       tracing:
         endpoint: http://192.168.200.6:9411/api/v2/spans
     tracing:
       sampling:
         probability: 1.0 # 记录速率100%
    
  3. 访问专辑详情接口、或查看专辑页面进行测试

http://localhost:8502/doc.html#/web-api/%E4%B8%93%E8%BE%91%E8%AF%A6%E6%83%85%E7%AE%A1%E7%90%86/getItem

  1. 通过Zipkin管理页面查看接口耗时

image-20231007101258763

2、详情优化

虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。

一般分为两个层面:

  • 一是提高数据库sql本身的性能。
  • 二是尽量避免直接查询数据库。

重点要讲的是第二个层面:尽量避免直接查询数据库。

解决办法就是:缓存

2.1 缓存常见问题

缓存最常见的3个问题:

  1. 缓存穿透
  2. 缓存雪崩
  3. 缓存击穿

缓存穿透: 是指查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。

  • 解决1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透。
  • 解决2 :使用布隆过滤器来解决随机穿透问题。

缓存雪崩:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

  • 解决1:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
  • 解决2:如果单节点宕机,可以采用集群部署方式防止雪崩

缓存击穿: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。

与缓存雪崩的区别:

  1. 击穿是一个热点key失效
  2. 雪崩是很多key集体失效
  • 解决:锁

2.2 缓存击穿解决方案

2.2.1 本地锁的局限性

之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题

service-album微服务中添加 TestController 控制器:

package com.atguigu.tingshu.album.api;

import com.atguigu.tingshu.album.service.TestService;
import com.atguigu.tingshu.common.result.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author atguigu
 * @ClassName TestController
 * @description: TODO
 * @date 2023年08月22日
 * @version: 1.0
 */
@RestController
@RequestMapping("api/album/test")
public class TestController {

    //  注入服务层方法
    @Autowired
    private TestService testService;

    /**
     * 测试分布式锁
     * @return
     */
    @GetMapping("testLock")
    public Result testLock(){
        testService.testLock();
        return Result.ok();
    }
}

TestService接口:

public interface TestService {
    /**
     * 测试分布式锁
     */
    void testLock();

}

TestServiceImpl实现:

package com.atguigu.tingshu.album.service.impl;

import com.atguigu.tingshu.album.service.TestService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @author: atguigu
 * @create: 2023-10-07 08:54
 */
@Service
public class TestServiceImpl  implements TestService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    @Override
    public void testLock() {
        try {
            //1.从Redis缓存中获取key="num"的值  保证redis中存在"num"(手动提前在redis中创建key)
            String value = stringRedisTemplate.opsForValue().get("num");
            if (StringUtils.isBlank(value)) {
                return;
            }
            //2.对获取到值进行+1操作
            int num = Integer.parseInt(value);
            stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
    }
}

启动三个微服务端口号分别为 8501、8601、8701 然后通过网关复杂均衡访问控制器

注释掉nacos配置中端口,在本地bootstrap.properties中配置默认端口,避免端口被覆盖。导致端口冲突

测试本地锁:

在linux 系统中安装 ab 压力测试工具 httpd-tools(已安装)

yum install -y httpd-tools

命令语法:ab -n(一次发送的请求数) -c(请求的并发数) 访问路径

ab  -n 5000 -c 100 http://192.168.200.1:8500/api/album/test/testLock

发现结果不是5000 ,说明本地锁是锁不住资源的;因此要使用分布式锁!

2.2.2 分布式锁

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存( Redis等)
  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:Redis最高
  2. 可靠性:Zookeeper最高(CAP定理)

2.2.3 使用redis脚本实现分布式锁

2.2.3.1 使用setnx 命令实现

  1. 多个客户端同时获取锁(setnx)

  2. 获取成功,执行业务逻辑{从db获取数据,放入缓存,执行完成释放锁(del)}

  3. 其他客户端等待重试

    /**
    * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
    */
    @Override
    public void testLock() {
    
       //0.先尝试获取锁 setnx key val
       Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
       if(flag){
           //获取锁成功,执行业务代码
           //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
           String value = stringRedisTemplate.opsForValue().get("num");
           //2.如果值为空则非法直接返回即可
           if (StringUtils.isBlank(value)) {
               return;
           }
           //3.对num值进行自增加一
           int num = Integer.parseInt(value);
           stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
    
           //4.将锁释放
           stringRedisTemplate.delete("lock");
    
       }else{
           try {
               Thread.sleep(100);
               this.testLock();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
    

进行测试:运行结果应该是5000。

可能出现的问题:

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

解决:设置过期时间,自动释放锁。

2.2.3.2 使用setex给锁设置过期时间

  1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
  2. 在set时指定过期时间(推荐)

将上锁代码改为:

Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS);

测试结果应该是5000

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。
  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
  3. index3获取到锁,执行业务逻辑
    1. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。

2.3.3.3 使用UUID 防止误删锁

修改代码:

@Override
public synchronized void testLock() {
    //  声明uuid
    String uuid = UUID.randomUUID().toString();
    //  将uuid 存储到缓存中
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
    if (flag){
        try {
            //1.从Redis缓存中获取key="num"的值  保证redis中存在"num"(手动提前在redis中创建key)
            String value = stringRedisTemplate.opsForValue().get("num");
            if (StringUtils.isBlank(value)) {
                return;
            }
            //2.对获取到值进行+1操作
            int num = Integer.parseInt(value);
            stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
            //  判断uuid 是否与缓存中的数据相等
            if (uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){
                //  如果相等,删除锁
                stringRedisTemplate.delete("lock");
            }

        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
    }else {
        try {
            Thread.sleep(100);
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等
  2. index1执行删除前,lock刚好过期时间已到,被Redis自动释放 在Redis中没有了锁。

  3. index2获取了lock,index2线程获取到了cpu的资源,开始执行方法

  4. index1执行删除,此时会把index2的lock删除

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的是index2的锁!

解决方案:使用lua脚本保证删除锁具有原子性操作

2.3.3.4 LUA脚本保证删除具有原子性

/**
 * 采用SpringDataRedis实现分布式锁
 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
 */
@Override
public void testLock() {

    //0.先尝试获取锁 setnx key val
    //问题:锁可能存在线程间相互释放
    //Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);
    //解决:锁值设置为uuid
    String uuid = UUID.randomUUID().toString();
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);

    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放 判断uuid
        //问题:删除操作缺乏原子性。
        //if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值
        //    //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除
        //    stringRedisTemplate.delete("lock");
        //}
        //解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行

        //执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i])  参数三:lua脚本中需要参数值 ARGV[i]
        //4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        //4.2设置脚本文本
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                "then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        redisScript.setScriptText(script);
        //4.3 设置响应类型
        redisScript.setResultType(Long.class);
        stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
    }else{
        try {
            //睡眠
            Thread.sleep(100);
            //自旋重试
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

总结:

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
  • 加锁和解锁必须具有原子性。

2.2.4 使用 redisson 解决分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

官方文档地址:https://github.com/Redisson/Redisson/wiki

实现步骤:

  1. service-util 模块中解开redisson相关依赖(注释放开)

    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    </dependency>
    
  2. service-util 模块中添加配置类

    package com.atguigu.tingshu.common.config.redis;
    
    import lombok.Data;
    import org.apache.commons.lang3.StringUtils;
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.redisson.config.SingleServerConfig;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
    * redisson配置信息
    */
    @Data
    @Configuration
    @ConfigurationProperties("spring.data.redis")
    public class RedissonConfig {
    
    private String host;
    
    private String password;
    
    private String port;
    
    private int timeout = 3000;
    private static String ADDRESS_PREFIX = "redis://";
    
    /**
     * 自动装配
     */
    @Bean
    RedissonClient redissonSingle() {
        Config config = new Config();
    
        if (StringUtils.isBlank(host)) {
            throw new RuntimeException("host is  empty");
        }
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(ADDRESS_PREFIX + this.host + ":" + port)
                .setTimeout(this.timeout);
        if (StringUtils.isNotBlank(this.password)) {
            serverConfig.setPassword(this.password);
        }
        return Redisson.create(config);
    }
    }
    
  3. 优化代码

    package com.atguigu.tingshu.album.service.impl;
    
    import com.alibaba.cloud.commons.lang.StringUtils;
    import com.atguigu.tingshu.album.service.TestService;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    /**
    * @author atguigu
    * @ClassName TestServiceImpl
    * @description: TODO
    * @date 2023年08月22日
    * @version: 1.0
    */
    @Service
    public class TestServiceImpl implements TestService {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Autowired
    private RedissonClient redissonClient;
    
    @Override
    public void testLock() {
        try {
            //先尝试获取分布式锁 1.创建锁对象  2.调用获取锁方法
            RLock lock = redissonClient.getLock("lock");
    
            //获取锁成功后,才执行业务
            lock.lock();  //阻塞到获取锁成功为止
            //lock.tryLock(5, TimeUnit.SECONDS);  //参数1:等待锁获取最大时间  参数2:时间单位  获取锁失败返回false
            //lock.tryLock(3, 5, TimeUnit.SECONDS);//参数1:等待锁获取最大时间  参数2:锁过期时间,参数3:时间单位  获取锁失败返回false
    
            try {
                //1.从Redis缓存中获取key="num"的值  保证redis中存在"num"(手动提前在redis中创建key)
                String value = redisTemplate.opsForValue().get("num");
                if (StringUtils.isBlank(value)) {
                    return;
                }
                //2.对获取到值进行+1操作
                int num = Integer.parseInt(value);
                redisTemplate.opsForValue().set("num", String.valueOf(++num));
            } finally {
                //业务执行完毕释放锁
                lock.unlock();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    }
    

分布式锁:

ab  -n 5000 -c 100 http://192.168.200.1:8500/api/album/test/testLock

2.3 分布式锁整合业务

2.3.1 使用redis 分布式锁整合业务

/**
 * 先采用SpringDataRedis分布式锁 解决缓存击穿
 * 根据专辑ID查询专辑信息包含专辑属性列表
 *
 * @param id
 * @return
 */
@Override
public AlbumInfo getAlbumInfo(Long id) {
    try {
        //1.优先从缓存中获取业务数据
        //1.1 构建业务数据Key
        String dataKey = RedisConstant.ALBUM_INFO_PREFIX + id;
        //1.2 查询Redis中缓存数据
        AlbumInfo albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
        //1.3 如果命中缓存则直接返回即可,未命中缓存,执行第二步
        if (albumInfo != null) {
            return albumInfo;
        }
        //2.尝试获取分布式锁
        //2.1 构建锁的Key
        String lockKey = RedisConstant.ALBUM_LOCK_PREFIX + id + RedisConstant.CACHE_LOCK_SUFFIX;
        //2.2 产生锁的值:锁标识UUID
        String lockValue = IdUtil.fastSimpleUUID();

        //2.3 尝试获取锁,利用set ex nx命令实现分布式锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, RedisConstant.ALBUM_LOCK_EXPIRE_PX2, TimeUnit.SECONDS);

        //2.4 获取锁成功执行业务
        if (flag) {
            try {
                //3.执行业务(查库业务,将查询结果放入缓存)
                albumInfo = this.getAlbumInfoFromDB(id);
                //3.1 查询数据库如果为空 将空数据有加入缓存中,设置短暂时间:10分钟
                if (albumInfo == null) {
                    redisTemplate.opsForValue().set(dataKey, albumInfo, RedisConstant.ALBUM_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
                    return albumInfo;
                }
                //3.2 查询数据库如果有值 将空数据有加入缓存中,设置时间:基础时间(1小时)+随机值
                int ttl = RandomUtil.randomInt(500, 3600);
                redisTemplate.opsForValue().set(dataKey, albumInfo, RedisConstant.ALBUM_TIMEOUT + ttl, TimeUnit.SECONDS);
                return albumInfo;
            } finally {
                //4.释放锁 采用lua脚本释放锁
                String text = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                        "then\n" +
                        "    return redis.call(\"del\",KEYS[1])\n" +
                        "else\n" +
                        "    return 0\n" +
                        "end";
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(text);
                redisScript.setResultType(Long.class);
                redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockValue);
            }
        } else {
            //2.5 获取锁失败-自旋
            Thread.sleep(200);
            return this.getAlbumInfo(id);
        }
    } catch (Exception e) {
        log.error("【专辑服务】获取专辑异常:{}", e);
        //兜底处理,查询数据库返回业务数据
        return this.getAlbumInfoFromDB(id);
    }
}


/**
 * 根据专辑ID查询专辑信息包含专辑属性列表
 *
 * @param id
 * @return
 */
@Override
public AlbumInfo getAlbumInfoFromDB(Long id) {
    //1.根据主键查询专辑信息
    AlbumInfo albumInfo = albumInfoMapper.selectById(id);

    //2.根据专辑ID查询专辑属性列表
    LambdaQueryWrapper<AlbumAttributeValue> queryWrapper = new LambdaQueryWrapper<>();
    queryWrapper.eq(AlbumAttributeValue::getAlbumId, id);
    List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueMapper.selectList(queryWrapper);
    albumInfo.setAlbumAttributeValueVoList(albumAttributeValues);
    return albumInfo;
}

2.3.2 使用redisson整合分布式锁

/**
 * 先采用Redisson分布式锁 解决缓存击穿
 * 根据专辑ID查询专辑信息包含专辑属性列表
 *
 * @param id
 * @return
 */
@Override
public AlbumInfo getAlbumInfo(Long id) {
    try {
        //1.优先从缓存中获取业务数据
        //1.1 构建缓存业务数据Key 形式:前缀:+ 主键标识
        String dataKey = RedisConstant.ALBUM_INFO_PREFIX + id;
        //1.2 查询Redis缓存
        AlbumInfo albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
        //1.3 命中缓存返回即可
        //1.4 未命中缓存则执行获取分布式锁,避免缓存击穿
        if (albumInfo != null) {
            return albumInfo;
        }
        //2.获取分布式锁
        //2.1 构建锁Key
        String lockKey = RedisConstant.ALBUM_LOCK_PREFIX + id + RedisConstant.CACHE_LOCK_SUFFIX;
        //2.2 创建锁对象
        RLock lock = redissonClient.getLock(lockKey);
        //2.3 获取锁
        lock.lock();

        //3.执行业务
        try {
            //3.1 再次查询一次缓存
            albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
            if (albumInfo != null) {
                return albumInfo;
            }
            //3.2 缓存未命中,在执行查询数据库
            albumInfo = this.getAlbumInfoFromDB(id);
            //3.3 查询数据库如果为空 将空数据有加入缓存中,设置短暂时间: 10 分钟
            if (albumInfo == null) {
                redisTemplate.opsForValue().set(dataKey, albumInfo, RedisConstant.ALBUM_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
                return albumInfo;
            }
            //3.4 查询数据库如果有值 将空数据有加入缓存中,设置时间:基础时间(1小时)+随机值
            int ttl = RandomUtil.randomInt(500, 3600);
            redisTemplate.opsForValue().set(dataKey, albumInfo, RedisConstant.ALBUM_TIMEOUT + ttl, TimeUnit.SECONDS);
            return albumInfo;
        } finally {
            //4.释放锁
            lock.unlock();
        }
    } catch (Exception e) {
        //兜底处理方案:查询数据库
        log.error("[专辑服务]Redis服务不可用,查询DB:{}", e);
        return this.getAlbumInfoFromDB(id);
    }
}

2.4 AOP与分布式锁整合

专辑详情中多项数据接口需要查询缓存,那么分布式锁的业务逻辑代码就会出现大量的重复。因此,我们可以参考Spring框架事务注解来实现简化代码。只要类上添加了一个注解,那么这个注解就会自带分布式锁的功能。

实现如下:

  • service-util 模块中添加一个注解
  • 提供切面类,采用环绕通知对自定义注解进行增强

2.4.1 自定义注解

package com.atguigu.tingshu.common.cache;

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface GuiGuCache {


    /**
     * 注解属性:缓存数据前缀、分布式锁Key前缀
     *
     * @return
     */
    String prefix() default "cache:";


}

2.4.2 自定义切面类

package com.atguigu.tingshu.common.cache;

import cn.hutool.core.util.RandomUtil;
import com.atguigu.tingshu.common.constant.RedisConstant;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author: atguigu
 * @create: 2023-11-01 11:15
 */
@Slf4j
@Aspect
@Component
public class GuiGuCacheAspect {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 缓存自定义注解切面类
     *
     * @param joinPoint
     * @param guiGuCache
     */
    @SneakyThrows
    @Around("@annotation(guiGuCache)")
    public Object cacheAspect(ProceedingJoinPoint joinPoint, GuiGuCache guiGuCache) {
        try {
            Object result = new Object();
            //1.优先从缓存中获取数据
            //1.1 动态构建缓存业务数据Key 形式:注解定义前缀+方法参数
            String param = "none";
            Object[] args = joinPoint.getArgs();
            if (args != null && args.length > 0) {
                param = Arrays.asList(args).stream().map(arg -> arg.toString()).collect(Collectors.joining(":"));
            }
            String dataKey = guiGuCache.prefix() + param;
            //1.2 查询Redis缓存
            result = redisTemplate.opsForValue().get(dataKey);

            //1.3 如果命中缓存直接返回
            if (result != null) {
                return result;
            }
            //2.获取分布式锁
            //2.1 动态构建锁的Key
            String lockKey = guiGuCache.prefix() + param + RedisConstant.CACHE_LOCK_SUFFIX;
            //2.2 创建锁对象
            RLock lock = redissonClient.getLock(lockKey);

            //2.3 获取分布式锁
            lock.lock();

            //3.业务逻辑执行(目标方法查库执行)
            try {
                //3.1 再次查询一次缓存 处于一开始获取锁失败塞状态线程,必然获取锁成功,避免这些多余线程查库
                result = redisTemplate.opsForValue().get(dataKey);
                if (result != null) {
                    return result;
                }
                //3.2 执行查询数据库
                result = joinPoint.proceed();
                //3.3 查询数据库如果为空 将空数据有加入缓存中,设置短暂时间: 10 分钟
                if (result == null) {
                    redisTemplate.opsForValue().set(dataKey, result, RedisConstant.ALBUM_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
                    return result;
                }
                //3.4 查询数据库如果有值 将空数据有加入缓存中,设置时间:基础时间(1小时)+随机值
                int ttl = RandomUtil.randomInt(500, 3600);
                redisTemplate.opsForValue().set(dataKey, result, RedisConstant.ALBUM_TIMEOUT + ttl, TimeUnit.SECONDS);
                return result;
            } finally {
                //4.释放锁
                lock.unlock();
            }
        } catch (Throwable e) {
            //5.Redis服务不可用,兜底处理-查库即可
            return joinPoint.proceed();
        }
    }
}

2.4.3 测试分布式锁

在这个实现类AlbumInfoServiceImpl中添加注解:

/**
 * 根据专辑ID查询专辑信息包含专辑属性列表
 *
 * @param id
 * @return
 */
@Override
@GuiGuCache(prefix = "albumInfo:")
public AlbumInfo getAlbumInfoFromDB(Long id) {}

@Override
@GuiGuCache(prefix = "albumStat:")
public AlbumStatVo getAlbumStatVoByAlbumId(Long albumId) {
  //	调用mapper 层方法
  return albumInfoMapper.selectAlbumStat(albumId);
}

/**
 * 根据专辑ID获取专辑统计信息
 *
 * @param albumId
 * @return
 */
@Override
@GuiGuCache(prefix = "albumStatVo:")
public AlbumStatVo getAlbumStatVo(Long albumId) {}

在BaseCategoryServiceImpl实现类中添加注解:

/**
 * 根据三级分ID(专辑视图主键)查询专辑信息
 *
 * @param category3Id
 * @return
 */
@Override
@GuiGuCache(prefix="categoryViewBy3Id:"){}

3、布隆过滤器

3.1 什么是布隆过滤器

布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。

主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。

总结: 判断一个元素一定不存在 或者 可能存在! 存在一定的误判率{通过代码调节}

Bit 数组:

0 0 0 0 0 0 0 0 0

3.1.1 使用场景

大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题

3.1.2 原理

存入过程

布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下:

通过K个哈希函数计算该数据,返回K个计算出的hash值

这些K个hash值映射到对应的K个二进制的数组下标

将K个下标对应的二进制数据改成1。

如图所示:

查询过程

布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下:

1、通过K个哈希函数计算该数据,对应计算出的K个hash值

2、通过hash值找到对应的二进制的数组下标

3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据可能存在集合中。

3.1.3 布隆过滤器的优缺点

优点

  1. 由于存储的是二进制数据,所以占用的空间很小
  2. 它的插入和查询速度是非常快的,时间复杂度是O(K),空间复杂度:O (M)。
    • K: 是哈希函数的个数
    • M: 是二进制位的个数
  3. 保密性很好,因为本身不存储任何原始数据,只有二进制数据

缺点

添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。

例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。

这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。

一、存在误判

假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。

因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。

误判率:

​ 受三个因素影响二进制位的个数m, 哈希函数的个数k, 数据规模n (添加到布隆过滤器中的函数)

已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了}

ln: 自然对数是以常数e为底数对数,记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。

二、删除困难

还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。

如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。

那么你是不是连“张三丰”都一起删了呀。

3.2 初始化布隆过滤器

service-search 模块的启动类中添加

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ServiceSearchApplication implements CommandLineRunner {

  @Autowired
  private RedissonClient redissonClient;

  public static void main(String[] args) {
    SpringApplication.run(ServiceAlbumApplication.class, args);
  }

  /**
     * Springboot应用初始化后会执行一次该方法
     * @param args
     * @throws Exception
     */
  @Override
  public void run(String... args) throws Exception {
    //初始化布隆过滤器
    RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
    //设置数据规模 误判率 预计统计元素数量为100000,期望误差率为0.01
    bloomFilter.tryInit(100000, 0.01);
  }
}

上架专辑的时候,添加将数据添加到布隆过滤器中

service-search模块中SearchServiceImpl上架专辑方法upperAlbum

@Autowired
private RedissonClient redissonClient;

/**
     * 构建专辑索引库文档对象,新增专辑到索引库
     *
     * @param albumId
     */
@Override
public void upperAlbum(Long albumId) {
    //....省略代码
    //将新增的商品SKUID存入布隆过滤器
    //获取布隆过滤器,将新增skuID存入布隆过滤器
    RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
    bloomFilter.add(albumInfo.getId());
}

查看专辑的时候判断布隆过滤器中是否存在

service-search模块中ItemServiceImpl汇总专辑方法getItem

@Autowired
private RedissonClient redissonClient;

/**
 * 汇总专辑详情相关信息
 *
 * @param albumId
 * @return
 */
@Override
public Map<String, Object> getItem(Long albumId) {
    RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
    boolean contains = bloomFilter.contains(albumId);
    if(!contains){
        throw new GuiguException(ResultCodeEnum.DATA_NOT_FOUNT);
    }
    //.....省略代码
}