**谷粒随享** ## 第6章 详情优化 **学习目标:** - 微服务**分布式链路追踪组件**应用(微服务基础组件) - 针对于相对高并发业务场景进行性能优化 - 采用分布式缓存Redis - 缓存击穿(**分布式锁**) - 缓存穿透(**分布式布隆过滤器**) - 缓存雪崩(分布式集群) - 缓存一致性(**基于MySQL-BinLog**数据最终一致) - 采用异步任务+线程池 - 代码优化:利用AOP将冗余代码抽取-提供自定义缓存注解 # 1、链路追踪Zipkin ​ 一个看起来很简单的应用,背后可能需要数十或数百个服务来支撑,一个请求就要多次服务调用。当请求变慢、或者不能使用时,我们是不知道是哪个后台服务引起的。这时,我们使用 Zipkin 就能解决这个问题。由于业务访问量的增大,业务复杂度增加,以及微服务架构和容器技术的兴起,要对系统进行各种拆分。微服务系统拆分后,我们可以使用 Zipkin 链路,来快速定位追踪有故障的服务点。 **Zipkin/SkyWalking** 是一款开源的分布式实时数据追踪系统(Distributed Tracking System),能够收集服务间调用的时序数据,提供调用链路的追踪。 **Zipkin** 其主要功能是聚集来自各个异构系统的实时监控数据,在微服务架构下,十分方便地用于服务响应延迟等问题的定位。 **Zipkin** 每一个调用链路通过一个 trace id 来串联起来,只要你有一个 trace id ,就能够直接定位到这次调用链路,并且可以根据服务名、标签、响应时间等进行查询,过滤那些耗时比较长的链路节点。 **Zipkin 分布式跟踪系统**就能非常好地解决该问题,**主要解决以下3点问题:** - 1. 动态展示服务的链路; - 2. 分析服务链路的瓶颈并对其进行调优; - 3. 快速进行服务链路的故障发现; 在优化前通过Zipkin链路追踪进行查看接口耗时 1. 在`service-util`工具模块pom.xml中新增Zipkin相关依赖 ```xml io.micrometer micrometer-tracing-bridge-brave io.zipkin.reporter2 zipkin-reporter-brave io.micrometer micrometer-observation org.springframework.boot spring-boot-starter-actuator org.springframework.cloud spring-cloud-starter-zipkin 2.2.8.RELEASE io.github.openfeign feign-micrometer 12.5 ``` 2. 在Nacos配置`common.yaml`中新增配置(**已完成**) ```yaml management: zipkin: tracing: endpoint: http://192.168.200.6:9411/api/v2/spans tracing: sampling: probability: 1.0 # 记录速率100% ``` 3. 解决异步任务+多线程导致异步Feign请求无法被链路追踪,故需要未线程池设置装饰器(将当前线程内上下文中trace_id传递到子线程中),故将当日资料中复制到`service-util`且设置Spring线程池对象装饰器 ![image-20240518114858404](assets/image-20240518114858404.png) ```java //设置解决zipkin链路追踪不完整装饰器对象 taskExecutor.setTaskDecorator(new ZipkinTaskDecorator(zipkinHelper)); ``` 4. 访问专辑详情接口、或查看专辑页面进行测试 http://localhost:8502/doc.html#/web-api/%E4%B8%93%E8%BE%91%E8%AF%A6%E6%83%85%E7%AE%A1%E7%90%86/getItem 5. 通过Zipkin管理页面查看接口耗时 ![image-20231007101258763](assets/image-20231007101258763.png) # 2、详情优化 虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。 一般分为两个层面: - 一是提高数据库sql本身的性能 - 单表 - 全值匹配我最爱、最左前缀要遵守/带头大哥不能死、中间兄弟不能断 - 索引列上少计算、LIKE符号写最右、 - 多表 - 小表(驱动表,在查询条件列上建立索引)驱动大表(被驱动表,关联字段,查询条件,上建立索引) - 减少子查询,优先使用关联查询 - 关联字段类型一致 、表编码一致 - 二是尽量避免直接查询数据库。 重点要讲的是第二个层面:尽量避免直接查询数据库。 解决办法就是:**分布式缓存** ## 2.1 缓存常见问题 缓存最常见的4个问题: 1. 缓存穿透 2. 缓存雪崩 3. 缓存击穿 4. 缓存一致性 **缓存穿透**: 是指查询一个**不存在的数据**,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。 - 解决1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透。 - 解决2 :使用**布隆过滤器**来解决随机穿透问题。 ![](assets/tingshu019.jpg) **缓存雪崩**:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。 - 解决1:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。 - 解决2:如果单节点宕机,可以采用集群部署方式防止雪崩 **缓存击穿**: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。 与缓存雪崩的区别: 1. 击穿是一个热点key失效 2. 雪崩是很多key集体失效 - 解决:锁 ![](assets/tingshu020.jpg) ## 2.2 缓存击穿解决方案 ### 2.2.1 分布式锁 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种**跨JVM的互斥机制**来控制**共享资源**的访问,这就是分布式锁要解决的问题! 分布式锁主流的实现方案: 1. 基于数据库实现分布式锁 2. 基于缓存( Redis等) 3. 基于Zookeeper 每一种分布式锁解决方案都有各自的优缺点: 1. 性能:Redis最高 2. 可靠性:Zookeeper最高(CAP定理-CP) ### 2.2.2 使用 Redisson 解决分布式锁 **Redisson是一个在Redis的基础上实现的Java驻内存数据网格**(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的**宗旨是促进使用者对Redis的关注分离**(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。 官方文档地址:[https://github.com/Redisson/Redisson/wiki](https://github.com/Redisson/Redisson/wiki "https://github.com/Redisson/Redisson/wiki") 实现步骤: 1. 在`service-util` 模块中解开redisson相关依赖(注释放开) ```xml org.redisson redisson-spring-boot-starter ``` 2. 在`service-util` 模块中添加配置类 ```java package com.atguigu.tingshu.common.config.redis; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SingleServerConfig; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * redisson配置信息 */ @Data @Configuration @ConfigurationProperties("spring.data.redis") public class RedissonConfig { private String host; private String password; private String port; private int timeout = 3000; private static String ADDRESS_PREFIX = "redis://"; /** * 自动装配 */ @Bean RedissonClient redissonSingle() { Config config = new Config(); if (StringUtils.isBlank(host)) { throw new RuntimeException("host is empty"); } SingleServerConfig serverConfig = config.useSingleServer() .setAddress(ADDRESS_PREFIX + this.host + ":" + port) .setTimeout(this.timeout); if (StringUtils.isNotBlank(this.password)) { serverConfig.setPassword(this.password); } return Redisson.create(config); } } ``` 3. 优化代码 ```java package com.atguigu.tingshu.album.service.impl; import com.alibaba.cloud.commons.lang.StringUtils; import com.atguigu.tingshu.album.service.TestService; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; /** * @author atguigu * @ClassName TestServiceImpl * @description: TODO * @date 2023年08月22日 * @version: 1.0 */ @Service public class TestServiceImpl implements TestService { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedissonClient redissonClient; @Override public void testLock() { try { //先尝试获取分布式锁 1.创建锁对象 2.调用获取锁方法 RLock lock = redissonClient.getLock("lock"); //获取锁成功后,才执行业务 lock.lock(); //阻塞到获取锁成功为止 //lock.tryLock(5, TimeUnit.SECONDS); //参数1:等待锁获取最大时间 参数2:时间单位 获取锁失败返回false //lock.tryLock(3, 5, TimeUnit.SECONDS);//参数1:等待锁获取最大时间 参数2:锁过期时间,参数3:时间单位 获取锁失败返回false try { //1.从Redis缓存中获取key="num"的值 保证redis中存在"num"(手动提前在redis中创建key) String value = redisTemplate.opsForValue().get("num"); if (StringUtils.isBlank(value)) { return; } //2.对获取到值进行+1操作 int num = Integer.parseInt(value); redisTemplate.opsForValue().set("num", String.valueOf(++num)); } finally { //业务执行完毕释放锁 lock.unlock(); } } catch (Exception e) { e.printStackTrace(); } } } ``` 分布式锁: ``` ab -n 5000 -c 100 http://192.168.200.1:8500/api/album/test/testLock ``` ![](assets/tingshu021.png) ## 2.3 分布式锁整合业务 ```java @Autowired private RedisTemplate redisTemplate; @Autowired private RedissonClient redissonClient; /** * 根据专辑ID查询专辑信息(包括专辑标签列表) * 通过Redisson分布式锁解决缓存击穿问题 * * @param id 专辑ID * @return 专辑信息 */ @Override public AlbumInfo getAlbumInfo(Long id) { try { //1.优先从缓存Redis中获取业务数据 //1.1 构建专辑业务数据Key 形式:前缀+专辑ID String dataKey = RedisConstant.ALBUM_INFO_PREFIX + id; //1.2 查询缓存中专辑信息 AlbumInfo albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey); if (albumInfo != null) { //2.如果命中缓存,直接返回业务数据即可(不需要查库) return albumInfo; } //3.如果未命中缓存,先获取分布式锁 //3.1 构建当前业务数据锁的Key 形式:业务key+锁的后缀 String lockKey = dataKey + RedisConstant.CACHE_LOCK_SUFFIX; //3.2 构建当前业务数据锁的对象 RLock lock = redissonClient.getLock(lockKey); //3.3 尝试获取分布式锁 tryLock默认锁过期时间30s底层还有看门狗机制进行锁自动续期 boolean flag = lock.tryLock(); if (flag) { try { //4.获取分布锁锁成功,执行查询数据库,并设置缓存 AlbumInfo albumInfoFromDB = this.getAlbumInfoFromDB(id); int ttl = RandomUtil.randomInt(100, 600); redisTemplate.opsForValue().set(dataKey, albumInfoFromDB, RedisConstant.ALBUM_TIMEOUT + ttl, TimeUnit.SECONDS); return albumInfoFromDB; } catch (Exception e) { throw new RuntimeException(e); } finally { //5.释放分布式锁 lock.unlock(); } } else { //6.获取分布式锁失败,进行自旋(自旋可能获取锁成功线程会将业务数据已经放入缓存) return this.getAlbumInfo(id); } } catch (Exception e) { log.error("[专辑服务]获取专辑信息锁服务异常异常:{},执行兜底处理方案:{}", e, "直接查询数据库"); return this.getAlbumInfoFromDB(id); } } /** * 抽取单独查询专辑数据业务方法 * * @param id * @return */ @Override public AlbumInfo getAlbumInfoFromDB(Long id) { //1.根据专辑ID查询专辑信息 AlbumInfo albumInfo = albumInfoMapper.selectById(id); //TODO 业务校验,校验专辑状态是否为下架状态,只有下架状态专辑才可以被修改 //2.根据专辑ID查询专辑标签列表 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(AlbumAttributeValue::getAlbumId, id); List albumAttributeValues = albumAttributeValueMapper.selectList(queryWrapper); if (CollectionUtil.isNotEmpty(albumAttributeValues)) { albumInfo.setAlbumAttributeValueVoList(albumAttributeValues); } return albumInfo; } ``` ## 2.4 AOP与分布式锁整合 专辑详情中多项数据接口需要查询缓存,那么分布式锁的业务逻辑代码就会出现大量的重复。因此,我们可以参考Spring框架事务注解来实现简化代码。只要类上添加了一个注解,那么这个注解就会自带分布式锁的功能。 实现如下: - 在`service-util` 模块中添加一个注解 - 提供切面类,采用环绕通知对自定义注解进行增强 ### 2.4.1 自定义注解 ```java package com.atguigu.tingshu.common.cache; import java.lang.annotation.*; /** * 自定义缓存注解 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface GuiGuCache { /** * 缓存业务及分布式锁key的前缀 * @return */ String prefix() default "data:"; } ``` ### 2.4.2 自定义切面类 ```java package com.atguigu.tingshu.common.cache; import cn.hutool.core.util.RandomUtil; import com.atguigu.tingshu.common.constant.RedisConstant; import com.atguigu.tingshu.common.login.GuiGuLogin; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @author: atguigu * @create: 2024-08-17 08:57 */ @Slf4j @Aspect @Component public class GuiGuCacheAspect { @Autowired private RedisTemplate redisTemplate; @Autowired private RedissonClient redissonClient; /** * 自定义缓存注解的切面逻辑 * * @param pjp 切入点对象 * @param guiGuCache 方法使用使用注解对象 * @return * @throws Throwable */ @Around("@annotation(guiGuCache)") public Object doBasicProfiling(ProceedingJoinPoint pjp, GuiGuCache guiGuCache) throws Throwable { try { //1.优先从缓存Redis中获取业务数据 //1.1 构建业务数据key 形式=注解前缀+方法参数 //1.1.1 获取注解前缀 String prefix = guiGuCache.prefix(); //1.1.2 获取方法参数如果存在多个参数采用_拼接 String params = "none"; Object[] args = pjp.getArgs(); if (args != null && args.length > 0) { List objects = Arrays.asList(args); params = objects.stream() .map(Object::toString) .collect(Collectors.joining("_")); } String dataKey = prefix + params; //1.2 查询Redis获取业务数据 Object objectResult = redisTemplate.opsForValue().get(dataKey); //2.如果命中缓存,直接返回业务数据即可(不需要查库) if (objectResult != null) { return objectResult; } //3.如果未命中缓存,先获取分布式锁 //3.1 构建分布式锁key String lockKey = dataKey + RedisConstant.CACHE_LOCK_SUFFIX; //3.2 获取锁对象 RLock lock = redissonClient.getLock(lockKey); //3.3 获取分布式锁 boolean flag = lock.tryLock(); //4.获取分布锁锁成功,执行查询数据库(目标方法执行),并设置缓存 if (flag) { try { //4.1 执行目标方法(执行自定义注解修饰方法-查询数据库方法) objectResult = pjp.proceed(); //4.2 将查询数据库业务数据放入Redis缓存中 int ttl = RandomUtil.randomInt(100, 600); redisTemplate.opsForValue().set(dataKey, objectResult, RedisConstant.ALBUM_TIMEOUT + ttl, TimeUnit.SECONDS); return objectResult; } finally { //5.释放分布式锁 lock.unlock(); } } else { //6.获取分布式锁失败,进行自旋(自旋可能获取锁成功线程会将业务数据已经放入缓存) return this.doBasicProfiling(pjp, guiGuCache); } } catch (Throwable e) { //7.兜底处理-直接查询数据库 log.error("[自定义缓存切面]Redis服务不可用,执行兜底方案,查库。异常:{}", e); return pjp.proceed(); } } } ``` ### 2.4.3 测试分布式锁 在这个实现类**AlbumInfoServiceImpl**中添加注解: ```java /*** * 根据专辑ID查询专辑信息包含专辑标签列表 * @param id 专辑ID * @return */ @Operation(summary = "根据专辑ID查询专辑信息包含专辑标签列表") @GetMapping("/albumInfo/getAlbumInfo/{id}") public Result getAlbumInfo(@PathVariable Long id) { //AlbumInfo albumInfo = albumInfoService.getAlbumInfo(id); AlbumInfo albumInfo = albumInfoService.getAlbumInfoFromDB(id); return Result.ok(albumInfo); } /*** * 根据专辑ID查询专辑信息包含专辑标签列表 * @param id 专辑ID * @return */ @Override @GuiGuCache(prefix = RedisConstant.ALBUM_INFO_PREFIX) public AlbumInfo getAlbumInfoFromDB(Long id) {...} /** * 根据专辑ID查询专辑统计信息 * * @param albumId 专辑ID * @return */ @Override @GuiGuCache(prefix = RedisConstant.ALBUM_INFO_PREFIX+":stat:") public AlbumStatVo getAlbumStatVo(Long albumId){...} ``` 在BaseCategoryServiceImpl实现类中添加注解: ```java /** * 根据一级分类Id获取分类标签以及标签值 * * @param category1Id * @return */ @Override @GuiGuCache(prefix="category:baseAttribute:") //将该方法执行查库结果放入缓存,采用分布式锁杜绝缓存击穿 public List getAttributeByCategory1Id(Long category1Id){} @Override @GuiGuCache(prefix = "category:categoryview:") public BaseCategoryView getCategoryView(Long category3Id){} /** * 根据1级分类ID查询该分类下前7个置顶3级分类列表 * * @param category1Id * @return */ @Override @GuiGuCache(prefix = "category:topcategory3:") public List getTopBaseCategory3(Long category1Id){} /** * 根据1级分类ID查询包含二级分类以及三级分类 * * @param category1Id * @return */ @Override @GuiGuCache(prefix = "category:categoryby1Id:") public JSONObject getBaseCategoryListByCategory1Id(Long category1Id){} ``` 在UserInfoServiceImpl实现类中添加注解: ```java @Override @GuiGuCache(prefix = "user:") public UserInfoVo getUserInfo(Long userId) {} ``` # 3、布隆过滤器 ## 3.1 什么是布隆过滤器 布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的[二进制](https://baike.baidu.com/item/二进制/361457 "二进制")向量和**一系列随机映射函数**。布隆过滤器可以用于[检索](https://baike.baidu.com/item/检索/11003896 "检索")一个元素是否在一个[集合](https://baike.baidu.com/item/集合/2908117 "集合")中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。 主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。 总结: **判断一个元素一定不存在 或者 可能存在!** 存在一定的误判率{通过代码调节} Bit 数组: | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | | | | | | | | | | ### 3.1.1 使用场景 大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题 ### 3.1.2 原理 **存入过程 ** 布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下: 通过**K个哈希函数**计算该数据,返回**K个**计算出的**hash值** 这些K个hash值映射到对应的K个二进制的数组下标 将K个下标对应的二进制数据改成1。 如图所示: ![](assets/tingshu022.jpg) 查询过程 布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下: 1、通过K个哈希函数计算该数据,对应计算出的K个hash值 2、通过hash值找到对应的二进制的数组下标 3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据**可能**存在集合中。 ### 3.1.3 布隆过滤器的优缺点 优点 1. 由于存储的是二进制数据,所以占用的空间很小 2. 它的插入和查询速度是非常快的,时间复杂度是O(K),空间复杂度:O (M)。 - K: 是哈希函数的个数 - M: 是二进制位的个数 3. 保密性很好,因为本身不存储任何原始数据,只有二进制数据 缺点 添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。 ![](assets/tingshu023.jpg) 例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。 这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。 一、存在误判 假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。 因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。 误判率: ​ 受三个**因素影响**: **二进制位的个数m**, 哈希**函数**的个数k, **数据规模n** (添加到布隆过滤器中的函数) ![](assets/tingshu024.jpg) 已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了} ![](assets/tingshu025.jpg) ln: 自然对数是以常数e为[底数](https://baike.baidu.com/item/底数/5416651 "底数")的[对数](https://baike.baidu.com/item/对数/91326 "对数"),记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。 二、删除困难 还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。 如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。 那么你是不是连“张三丰”都一起删了呀。 ## 3.2 初始化布隆过滤器 在`service-search` 模块的启动类中添加 ```java package com.atguigu.tingshu; import com.atguigu.tingshu.common.constant.RedisConstant; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源自动配置 @EnableDiscoveryClient @EnableFeignClients//(basePackages = {"a","b"}) //确保能够扫描到依赖中OpenFeign接口 @EnableAsync public class ServiceSearchApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ServiceSearchApplication.class, args); } @Autowired private RedissonClient redissonClient; /** * 当springboot应用启动成功后执行 * * @param args incoming main method arguments * @throws Exception */ @Override public void run(String... args) throws Exception { //初始化布隆过滤器 RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER); //判断布隆过滤器是否存在 if (!bloomFilter.isExists()) { //通过数据规模+误判率进行初始化布隆过滤器 bloomFilter.tryInit(100000, 0.03); } } } ``` ### 上架专辑的时候,添加将数据添加到布隆过滤器中 `service-search`模块中**SearchServiceImpl**上架专辑方法**upperAlbum** ```java @Autowired private RedissonClient redissonClient; /** * 构建专辑索引库文档对象,新增专辑到索引库 * * @param albumId */ @Override public void upperAlbum(Long albumId) { //....省略代码 //将新增的商品SKUID存入布隆过滤器 //获取布隆过滤器,将新增skuID存入布隆过滤器 RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER); bloomFilter.add(albumInfo.getId()); } ``` ### 查看专辑的时候判断布隆过滤器中是否存在 `service-search`模块中**ItemServiceImpl**汇总专辑方法**getItemInfo()** ```java @Autowired private RedissonClient redissonClient; /** * 汇总专辑详情相关信息 * * @param albumId * @return */ @Override public Map getAlbumItem(Long albumId) { //0.查询布隆过滤器是否包含查询专辑ID RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER); boolean flag = bloomFilter.contains(albumId); if (!flag) { throw new GuiguException(404, "访问专辑不存在"); } //.....省略代码 } ``` ## 3.3 布隆过滤器扩容 采用定时任务机制进行布隆过滤器扩容 ```java package com.atguigu.tingshu.album.job; import cn.hutool.core.collection.CollectionUtil; import com.atguigu.tingshu.album.mapper.AlbumInfoMapper; import com.atguigu.tingshu.common.constant.RedisConstant; import com.atguigu.tingshu.model.album.AlbumInfo; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** * @author: atguigu * @create: 2024-08-17 14:10 */ @Slf4j @Component public class RebuildBloomFilter { @Autowired private RedissonClient redissonClient; @Autowired private AlbumInfoMapper albumInfoMapper; /** * 每月1号凌晨3点整重建布隆过滤器 cron表达式:0 0 3 * * ? * 测试每隔10s进行一次扩容 */ @Scheduled(cron = "0 0 3 * * ?") public void rebuildBloomFilter() { log.info("重建布隆过滤器定时任务执行"); //1.获取"旧"的布隆过滤器-获取现有数据量 RBloomFilter oldBloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER); //2.判断是否需要进行扩容,扩容条件:现有数量>期望数据规模 if (oldBloomFilter.count() >= oldBloomFilter.getExpectedInsertions()) { //2.1 创建"新"的布隆过滤器完成初始化 RBloomFilter newBloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER + ":new"); newBloomFilter.tryInit(oldBloomFilter.getExpectedInsertions() * 2, oldBloomFilter.getFalseProbability()); //2.2 删除"旧"的布隆过滤器 oldBloomFilter.delete(); //2.3 再对新的布隆进行重命名:改回旧的名称 newBloomFilter.rename(RedisConstant.ALBUM_BLOOM_FILTER); //3.查询专辑表得到最新库中审核通过的专辑ID列表 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(AlbumInfo::getStatus, "0301"); queryWrapper.select(AlbumInfo::getId); List albumInfoList = albumInfoMapper.selectList(queryWrapper); //4.将专辑ID加入到"新"的布隆过滤器 if (CollectionUtil.isNotEmpty(albumInfoList)) { for (AlbumInfo albumInfo : albumInfoList) { newBloomFilter.add(albumInfo.getId()); } } } } } ``` # 4、数据一致性方案 ## 4.1 缓存的更新方案(传统) 先更新DB还是先更新缓存?是更新缓存还是删除缓存?在常规情况下,怎么操作都可以,但一旦存在高并发场景,就需要采用合适的方案。 **1、先更新数据库再更新缓存**(双写策略1) 线程A:更新数据库(第1s)——> 更新缓存(第10s) 线程B:更新数据库 (第3s)——> 更新缓存(第5s) 并发场景下,这样的情况是很容易出现的,每个线程的操作先后顺序不同,这样就导致请求B的缓存值被请求A给覆盖了,**数据库中是线程B的新值,缓存中是线程A的旧值**,并且会一直这么脏下去直到缓存失效(设置了过期时间)。 ![](assets/image-20230613155008778.png) **2、先更新缓存再更新数据库(双写策略2)** 线程A:更新缓存(第1s)——> 更新数据库(第10s) 线程B: 更新缓存(第3s)——> 更新数据库(第5s) 和前面一种情况相反,**缓存中是线程B的新值,而数据库中是线程A的旧值。** ![](assets/image-20230613155016668.png) 前两种方式之所以会在并发场景下出现异常,**本质上是因为更新缓存和更新数据库是两个操作**,我们没有办法控制并发场景下两个操作之间先后顺序,也就是先开始操作的线程先完成自己的工作。 **3、先删除缓存再更新数据库** 通过这种方式,我们很惊喜地发现,前面困扰我们的并发场景的问题确实被解决了!两个线程都只修改数据库,不管谁先,数据库以之后修改的线程为准。 但这个时候,我们来思考另一个场景:两个并发操作,一个是更新操作,另一个是查询操作,更新操作删除缓存后,查询操作没有命中缓存,先把老数据读出来后放到缓存中,然后更新操作更新了数据库。于是,在缓存中的数据还是老的数据,导致缓存中的数据是脏的。很显然,这种状况也不是我们想要的。 ![](assets/image-20230613155025378.png) ## 4.2 延时双删方案 修改数据业务逻辑 1.删除缓存 2.更新数据库 **3.睡眠一段时间(500ms)** 4.再次删除缓存 加了个睡眠时间,**主要是为了确保请求 A (更新)在睡眠的时候,请求 B (查询)能够在这这一段时间完成「从数据库读取数据,再把缺失的缓存写入缓存」的操作**,然后请求 B 睡眠完,再删除缓存。 所以,请求 B 的睡眠时间就需要大于请求A「从数据库读取数据 + 写入缓存」的时间。 但是具体睡眠多久其实是个**玄学**,很难评估出来,所以这个方案也只是尽可能保证一致性而已,极端情况下,依然也会出现缓存不一致的现象。因此,还是不太建议这种方案。 ![](assets/20230306205432-1679317608469-1686644815214.png) 对于蓝色的文字,“删除缓存 10”必须在“回写缓存10”后面,那如何才能保证一定是在后面呢?**让请求 A 的最后一次删除,等待 500ms。** ## 4.3 分布式读写锁 - 读读允许并发 - 读写不允许并发 - 写读不允许并发 - 写写不允许并发 依赖“锁”的机制,避免出现并发读写。弊端:性能低 **读数据方法(查询操作)** ```java @Autowired private RedissonClient redissonClient; /** * 读数据方法 * * @param id * @return */ @Override public Long read(Long id) { //1.创建读写锁对象 锁名称前缀:数据标识 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("myLock:" + id); RLock lock = readWriteLock.readLock(); //2.获取读锁成功 执行读操作 读锁持有期间,不允许并发发写,允许并发读 try { lock.lock(5, TimeUnit.SECONDS); log.info(Thread.currentThread().getName() + "执行读数据操作"); return id; } finally { //3.释放读锁 //lock.unlock(); } } ``` **修改数据方法(修改操作)** ```java /** * 写数据方法 * * @param id */ @Override public void write(Long id) { //1.创建读写锁对象 锁名称前缀:数据标识 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("myLock:" + id); //2.获取写锁成功 执行写操作 写锁持有期间,不允许并发读,并发写 RLock lock = readWriteLock.writeLock(); lock.lock(5, TimeUnit.SECONDS); try { log.info(Thread.currentThread().getName() + "执行写入操作"); } finally { //3.释放写锁 //lock.unlock(); } } ``` ## 4.4 Binlog方案 ### 4.4.1 Canal服务端 「**先更新数据库,再删缓存**」的策略的第一步是更新数据库,那么更新数据库成功,就会产生一条变更日志,记录在 binlog 里。 于是我们就可以通过订阅 binlog 日志,拿到具体要操作的数据,然后再执行缓存删除,阿里巴巴开源的 **Canal** 中间件就是基于这个实现的。 Canal 模拟 MySQL 主从复制的交互协议,把自己伪装成一个 MySQL 的从节点,向 MySQL 主节点发送 dump 请求,MySQL 收到请求后,就会开始推送 Binlog 给 Canal,Canal 解析 Binlog 字节流之后,转换为便于读取的结构化数据,供下游程序订阅使用。 下图是 Canal 的工作原理: ![image-20231127154824984](assets/image-20231127154824984.png) 所以,**如果要想保证「先更新数据库,再删缓存」策略第二个操作能执行成功,我们可以使用「消息队列来重试缓存的删除」,或者「订阅 MySQL binlog 再操作缓存」,这两种方法有一个共同的特点,都是采用异步操作缓存。** 1. 验证MySQL是否开启BinLog确保Value为 ON(已完成) ```shell show variables like '%log_bin%'; ``` ![image-20230925213159314](assets/image-20230925213159314.png) 2. 在**MySQL**新增用户用于监听Binlog日志,在Canal容器中要使用该用户(已完成) ```mysql #创建用户 CREATE USER canal IDENTIFIED BY '123456'; #给用户授权-所有库所有表主从同步权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; #如果是MySQL8.X以上需要对加密方式进行设置 ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456'; #刷新生效 FLUSH PRIVILEGES; ``` 2. 查询MySQL主节点状态;完成重置 ``` SHOW MASTER STATUS; reset master; ``` ![image-20240419144646434](assets/image-20240419144646434.png) ![image-20240521113105841](assets/image-20240521113105841.png) 3. 采用Docker方式创建Canal服务端。以下为创建Canal容器命令,要修改要监听主MySQL数据库IP跟端口用户名以及密码确保正确(不需要执行) > -e canal.instance.filter.regex=.*\\..* 代表监控所有数据库表变更的日志 ```shell docker run -p 11111:11111 --name canal \ -e canal.destinations=tingshuTopic \ -e canal.instance.master.address=192.168.200.6:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=123456 \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.instance.filter.regex=^tingshu_.*\\..* \ -d canal/canal-server:v1.1.8 ``` 4. 删除容器,重新创建canal容器,促使canal重新跟MySQL建立连接,从MySQL最新BinLog偏移量开始读取数据 ![image-20240521113606990](assets/image-20240521113606990.png) 5. Canal日志在容器 ``` /home/admin/canal-server/logs/tingshuTopic 日志路径 ``` ![image-20240521113716936](assets/image-20240521113716936.png) ### 4.4.2 Canal客户端 1. 在`service`父工程,下创建独立新模块`service-cdc`在java项目中引入依赖 ```xml service com.atguigu.tingshu 1.0 4.0.0 service-cdc 17 17 UTF-8 io.github.xizixuejie canal-spring-boot-starter 0.0.17 ``` 2. 提供启动类 ```java package com.atguigu.tingshu; import io.xzxj.canal.spring.annotation.EnableCanalListener; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** * @author: atguigu * @create: 2025-03-21 15:46 */ @EnableCanalListener @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class CDCApplicaiton { public static void main(String[] args) { SpringApplication.run(CDCApplicaiton.class, args); } } ``` 3. 在Nacos创建配置文件`service-canal-dev.yaml` 内容如下: ![image-20250321170122534](assets/image-20250321170122534.png) ```yaml server: port: 8601 canal: server: 192.168.200.6:11111 destination: tingshuTopic ``` 4. 项目本地创建 `bootstrap.properties` 配置文件 ```properties spring.application.name=service-canal spring.profiles.active=dev spring.main.allow-bean-definiton-overriding=true spring.cloud.nacos.discovery.server-addr=192.168.200.6:8848 spring.cloud.nacos.config.server-addr=192.168.200.6:8848 spring.cloud.nacos.config.prefix=${spring.application.name} spring.cloud.nacos.config.file-extension=yaml spring.cloud.nacos.config.shared-configs[0].data-id=common.yaml ``` 5. 监听到变更业务处理类-用户信息 ```java package com.atguigu.tingshu.listener; import com.atguigu.tingshu.model.user.UserInfo; import io.xzxj.canal.core.annotation.CanalListener; import io.xzxj.canal.core.listener.EntryListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import java.util.Set; /** * @author: atguigu * @create: 2025-03-21 15:47 */ @Slf4j @CanalListener(destination = "tingshuTopic", schemaName = "tingshu_user", tableName = "user_info") public class UserListener implements EntryListener { @Autowired private RedisTemplate redisTemplate; /** * 监听用户表更新回调方法 * @param before * @param after * @param fields */ @Override public void update(UserInfo before, UserInfo after, Set fields) { log.info("[cdc]监听到变更数据"); String redisKey = "user:userinfo:"+after.getId(); redisTemplate.delete(redisKey); } } ``` 6. 监听到变更业务处理类-专辑信息 ```java package com.atguigu.tingshu.listener; import com.atguigu.tingshu.model.album.AlbumInfo; import io.xzxj.canal.core.annotation.CanalListener; import io.xzxj.canal.core.listener.EntryListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import java.util.Set; /** * @author: atguigu * @create: 2025-03-22 08:46 */ @Slf4j @CanalListener(destination = "tingshuTopic", schemaName = "tingshu_album", tableName = "album_info") public class AlbumListener implements EntryListener { @Autowired private RedisTemplate redisTemplate; /** * 监听到转机信息表更新 * @param before 变更前数据 * @param after 变更后的数据 * @param fields 变更字段 */ @Override public void update(AlbumInfo before, AlbumInfo after, Set fields) { log.info("[cdc]监听到变更数据,变更前:{},变更后:{}", before, after); Long id = after.getId(); String redisKey = "album:info:"+id; redisTemplate.delete(redisKey); } } ``` **tips:如果启动canal客户端长时间未监听到变更数据,做以下操作** 1. 重置MySQL binglog ``` #重置MySQL主节点Binglog reset master; SHOW MASTER STATUS; ``` 2. 重新创建canal容器 ![image-20240419153629312](assets/image-20240419153629312.png)