## 第6章-商品详情进阶优化 **学习目标:** - 商品详情页优化方案 - 掌握Redis实现分布式锁方式 - 掌握Redisson实现Redis分布式锁 - 分布式锁+AOP实现缓存 - 布隆过滤器的使用场景及应用 # 1、商品详情页面优化 ## 1.1 思路 虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。 一般分为两个层面: - 一是提高数据库sql本身的性能 - 二是尽量避免直接查询数据库。 重点要讲的是另外一个层面:尽量避免直接查询数据库。 解决办法就是:**缓存** ## 1.2 整合Redis到工程 由于Redis作为缓存数据库,要被多个项目使用,所以要制作一个通用的工具类,方便工程中的各个模块使用。 而主要使用Redis的模块,都是后台服务的模块,service工程。所以咱们把Redis的工具类放到service-util模块中,这样所有的后台服务模块都可以使用Redis。 ### 1.2.1 首先在service-util引入依赖包 ```xml org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2 2.6.0 ``` ### 1.2.2 添加Redis配置类 在`service-util`配置自定义RedisTemplate对象-设置Key,Val的序列化方式 ```java package com.atguigu.gmall.common.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching public class RedisConfig { @Primary @Bean public RedisTemplate RedisTemplate(RedisConnectionFactory RedisConnectionFactory) { RedisTemplate RedisTemplate = new RedisTemplate<>(); RedisTemplate.setConnectionFactory(RedisConnectionFactory); //使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化) 对存储的对象进行JSON序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 序列化key value RedisTemplate.setKeySerializer(new StringRedisSerializer()); RedisTemplate.setValueSerializer(jackson2JsonRedisSerializer); RedisTemplate.setHashKeySerializer(new StringRedisSerializer()); RedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); RedisTemplate.afterPropertiesSet(); return RedisTemplate; } } ``` ![image-20221228191445639](assets/image-20221228191445639.png) 说明:由于`service-util`属于公共模块,所以我们把它引入到service父模块,其他service子模块都自动引入了 ## 1.3 Redis业务开发相关规则 开始开发先说明Redis key的命名规范,由于Redis不像数据库表那样有结构,其所有的数据全靠key进行索引,所以Redis数据的可读性,全依靠key。 企业中最常用的方式就是: ```properties object:id:field ``` 比如:sku:1314:info ​ user:1092:info **:** 表示根据windows的 / 一个意思 在RedisConst中定义Redis的常量,RedisConst类在service-util模块中,所有的Redis常量我们都配置在这里 ```java package com.atguigu.gmall.common.constant; /** * Redis常量配置类 * */ public class RedisConst { public static final String SKUKEY_PREFIX = "sku:"; public static final String SKUKEY_SUFFIX = ":info"; //单位:秒 public static final long SKUKEY_TIMEOUT = 24 * 60 * 60; } ``` ## 1.4 缓存常见问题 缓存最常见的3个问题: 面试 1. 缓存穿透 2. 缓存雪崩 3. 缓存击穿 **缓存穿透**: 是指查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。 - 解决1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透。 - 解决2 :使用布隆过滤器来解决随机穿透问题。 ![img](assets/day06/wps1.jpg) **缓存雪崩**:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。 - 解决1:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。 - 解决2:如果单节点宕机,可以采用集群部署方式防止雪崩 **缓存击穿**: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。 与缓存雪崩的区别: 1. 击穿是一个热点key失效 2. 雪崩是很多key集体失效 - 解决:锁 ![img](assets/day06/wps2.jpg) # 2、分布式锁 ## 2.1 本地锁的局限性 之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题 ### 2.1.1 编写测试代码 在``service-product``中新建TestController中添加测试方法 ```java package com.atguigu.gmall.product.controller; @Api(tags = "测试接口") @RestController @RequestMapping("admin/product/test") public class TestController { @Autowired private TestService testService; @GetMapping("testLock") public Result testLock() { testService.testLock(); return Result.ok(); } } ``` 业务接口 ```java package com.atguigu.gmall.product.service; public interface TestService { void testLock(); } ``` 业务实现类 ```java package com.atguigu.gmall.product.service.impl; @Service public class TestServiceImpl implements TestService { @Autowired private StringRedisTemplate redisTemplate; @Override public void testLock() { // 查询Redis中的num值 String value = (String)this.redisTemplate.opsForValue().get("num"); // 没有该值return if (StringUtils.isBlank(value)){ return ; } // 有值就转成成int int num = Integer.parseInt(value); // 把Redis中的num值+1 this.redisTemplate.opsForValue().set("num", String.valueOf(++num)); } } ``` **说明**:通过reids客户端设置num=0 ### 2.1.2. 使用ab工具测试 登录虚拟机192.168.200.128,命令行中使用ab测试工具:httpd-tools(yum install -y httpd-tools)**已安装** > 命令语法:ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 测试如下:5000请求,100并发 **注意:关闭本地Windows方法** ```shell ab -n 5000 -c 100 http://192.168.200.1:8206/admin/product/test/testLock ``` ![img](assets/day06/wps3.jpg) 查看Redis中的值: ![img](assets/day06/wps4.jpg) ### 2.1.3. 使用本地锁 ```java @Override public synchronized void testLock() { // 查询Redis中的num值 String value = (String)this.redisTemplate.opsForValue().get("num"); // 没有该值return if (StringUtils.isBlank(value)){ return ; } // 有值就转成成int int num = Integer.parseInt(value); // 把Redis中的num值+1 this.redisTemplate.opsForValue().set("num", String.valueOf(++num)); } ``` 使用ab工具压力测试:5000次请求,并发100 ![img](assets/day06/wps5.jpg) 查看Redis中的结果: ![img](assets/day06/wps6.jpg) 完美!与预期一致,是否真的完美? 接下来再看集群情况下,会怎样? ### 2.1.4. 本地锁问题演示锁 接下来启动8206 8216 8226 三个运行实例,运行多个`service-product`实例: server.port=8216 server.port=8226 ![img](assets/day06/wps7.jpg) 注意:bootstrap.properties 添加一个server.port = 8206; 将nacos的配置注释掉! 通过网关压力测试: 启动网关: ```http ab -n 5000 -c 100 http://192.168.200.1/admin/product/test/testLock ``` ![img](assets/day06/wps8.jpg) 查看Redis中的值: ![img](assets/day06/wps9.jpg) 集群情况下又出问题了!!! 以上测试,可以发现: 本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性。 此时需要分布式锁。。 ## 2.2 分布式锁实现的解决方案 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题! 分布式锁主流的实现方案: 1. 基于数据库实现分布式锁 2. 基于缓存( Redis等) 3. 基于Zookeeper 每一种分布式锁解决方案都有各自的优缺点: 1. 性能:Redis最高 2. 可靠性:zookeeper最高 因为Redis具备高性能、高可用、高并发的特性,这里,我们就基于Redis实现分布式锁。 分布式锁的关键是**多进程共享的内存标记(锁)**,因此只要我们在Redis中放置一个这样的标记(数据)就可以了。不过在实现过程中,不要忘了我们需要实现下列目标: - **多进程可见**:多进程可见,否则就无法实现分布式效果 - **避免死锁**:死锁的情况有很多,我们要思考各种异常导致死锁的情况,保证锁可以被释放 尝试获取锁 成功:执行业务代码 执行业务 try(){业务代码-宕机} catch() finally{ 释放锁} 失败:等待;失效;下次 - **排它**:同一时刻,只能有一个进程获得锁 - **高可用**:避免锁服务宕机或处理好宕机的补救措施(redis集群架构:1.主从复制 2.哨兵 3.cluster集群) ## 2.3 使用Redis实现分布式锁 ![img](assets/day06/wps10.jpg) 1. 多个客户端同时获取锁(setnx) 2. 获取成功,执行业务逻辑{从db获取数据,放入缓存,执行完成释放锁(del) 3. 其他客户端等待重试 ### 2.3.1. 编写代码 ```java /** * 采用SpringDataRedis实现分布式锁 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key) */ @Override public void testLock() { //0.先尝试获取锁 setnx key val Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock"); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 stringRedisTemplate.delete("lock"); }else{ try { Thread.sleep(100); this.testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 重启,服务集群,通过网关压力测试: ![img](assets/day06/wps11.jpg) 查看Redis中num的值: ![img](assets/day06/wps12.jpg) 基本实现。 问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放 解决:设置过期时间,自动释放锁。 ### 2.3.2. 优化之设置锁的过期时间 设置过期时间有两种方式: 1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放) 2. 在set时指定过期时间(推荐) ![img](assets/day06/wps13.jpg) 设置过期时间: ![img](assets/day06/wps14.jpg) 压力测试肯定也没有问题。自行测试 问题:可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下 1. index1业务逻辑没执行完,3秒后锁被自动释放。 2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。 3. index3获取到锁,执行业务逻辑 4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放。 最终等于没锁的情况。 解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁 ### 2.3.3. 优化之UUID防误删 ![img](assets/day06/wps15.jpg) ![img](assets/day06/wps16.jpg) 问题:删除操作缺乏原子性。 场景: 1. index1执行删除时,查询到的lock值确实和uuid相等 ![img](assets/day06/wps17.jpg) 2. index1执行删除前,lock刚好过期时间已到,被Redis自动释放 在Redis中没有了锁。 ![img](assets/day06/wps18.jpg) 3. index2获取了lock,index2线程获取到了cpu的资源,开始执行方法 4. index1执行删除,此时会把index2的lock删除 index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行 ![img](assets/day06/wps19.jpg) 删除的index2的锁! ### 2.3.4. 优化之LUA脚本保证删除的原子性 ```java /** * 采用SpringDataRedis实现分布式锁 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key) */ @Override public void testLock() { //0.先尝试获取锁 setnx key val //问题:锁可能存在线程间相互释放 //Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS); //解决:锁值设置为uuid String uuid = UUID.randomUUID().toString(); Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS); if(flag){ //获取锁成功,执行业务代码 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 判断uuid //问题:删除操作缺乏原子性。 //if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值 // //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除 // stringRedisTemplate.delete("lock"); //} //解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行 //执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i]) 参数三:lua脚本中需要参数值 ARGV[i] //4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功 DefaultRedisScript redisScript = new DefaultRedisScript<>(); //4.2设置脚本文本 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; redisScript.setScriptText(script); //4.3 设置响应类型 redisScript.setResultType(Long.class); stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid); }else{ try { //睡眠 Thread.sleep(100); //自旋重试 this.testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 问题: Redis集群状态下的问题: 1. 客户端A从master获取到锁 2. 在master将锁同步到slave之前,master宕掉了。 3. slave节点被晋级为master节点 4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。 安全失效! 解决方案:了解即可! ![img](assets/day06/wps20.jpg) ### 2.3.5. 总结 1、加锁 ```java // 1. 从Redis中获取锁,set k1 v1 px 20000 nx String uuid = UUID.randomUUID().toString(); Boolean lock = this.redisTemplate.opsForValue() .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS); ``` 2、使用lua释放锁 ```java // 2. 释放锁 del String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 设置lua脚本返回的数据类型 DefaultRedisScript redisScript = new DefaultRedisScript<>(); // 设置lua脚本返回类型为Long redisScript.setResultType(Long.class); redisScript.setScriptText(script); redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid); ``` 3、重试 ```java Thread.sleep(500); testLock(); ``` 为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: - 互斥性。在任意时刻,只有一个客户端能持有锁。 - 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。 - 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。 - 加锁和解锁必须具有原子性 ## 2.4 使用Redisson 解决分布式锁 ​ Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。 官方文档地址:https://github.com/Redisson/Redisson/wiki Github 地址:https://github.com/Redisson/Redisson ### 2.4.1 实现代码 1. `service-util`模块导入依赖 ```xml org.Redisson Redisson 3.15.3 ``` 2. 配置Redisson客户端对象 ```java package com.atguigu.gmall.common.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SingleServerConfig; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; @Configuration public class RedissonConfig { private static String ADDRESS_PREFIX = "redis://%s:%d"; private int timeout = 3000; @Bean public RedissonClient redissonClient(RedisProperties prop) { if (StringUtils.isEmpty(prop.getHost())) { throw new RuntimeException("host is empty"); } Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer() .setAddress(String.format(ADDRESS_PREFIX, prop.getHost(), prop.getPort())) .setTimeout(timeout); if (!StringUtils.isEmpty(prop.getPassword())) { singleServerConfig.setPassword(prop.getPassword()); } return Redisson.create(config); } } ``` **注意**:这里读取了一个名为RedisProperties的属性,因为我们引入了SpringDataRedis,Spring已经自动加载了RedisProperties,并且读取了配置文件中的Redis信息。 3. 修改实现类 ```java @Autowired private RedissonClient redissonClient; /** * 使用Redison实现分布式锁 * 开发步骤: * 1.使用RedissonClient客户端对象 创建锁对象 * 2.调用获取锁方法 * 3.执行业务逻辑 * 4.将锁释放 * */ public void testLock() { //0.创建锁对象 RLock lock = redissonClient.getLock("lock1"); //0.1 尝试加锁 //0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30s lock.lock(); //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //4.将锁释放 lock.unlock(); } ``` ### 2.4.2 可重入锁(Reentrant Lock) 基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。 大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。 另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。 快速入门使用的就是可重入锁。也是最常使用的锁。 最常见的使用: ```java @Autowired private RedissonClient redissonClient; /** * 使用Redison实现分布式锁 * 开发步骤: * 1.使用RedissonClient客户端对象 创建锁对象 * 2.调用获取锁方法 * 3.执行业务逻辑 * 4.将锁释放 */ public void testLock() { //0.创建锁对象 RLock lock = redissonClient.getLock("lock1"); try { //0.1 尝试加锁 //0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30s //lock.lock(); //0.1.2 lock(数字,时间单位) 指定获取锁成功有效期,直到获取锁成功。到期自动释放 //lock.lock(10, TimeUnit.SECONDS); //0.1.3 tryLock(等待获取锁时间,锁的有效期,时间单位) 指定时间内如果获取锁成功,返回true 执行后续业务,如果超过等待时间,返回false boolean flag = lock.tryLock(2, 10, TimeUnit.SECONDS); if (flag) { System.out.println(Thread.currentThread().getName() + "线程获取锁成功"); //获取成功 执行业务 //1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0 String value = stringRedisTemplate.opsForValue().get("num"); //2.如果值为空则非法直接返回即可 if (StringUtils.isBlank(value)) { return; } //3.对num值进行自增加一 int num = Integer.parseInt(value); stringRedisTemplate.opsForValue().set("num", String.valueOf(++num)); //测试可重入锁 this.check(); //4.将锁释放 lock.unlock(); } else { //本地获取锁失败 Thread.sleep(100); this.testLock(); } } catch (InterruptedException e) { e.printStackTrace(); //如果执行异常,释放锁 lock.unlock(); } } /** * 测试可重入 */ private void check() { //尝试获取锁 RLock lock = redissonClient.getLock("lock1"); lock.lock(); //执行业务 System.out.println(Thread.currentThread().getName() + "可重入获取成功"); //释放锁 lock.unlock(); } ``` 看门狗原理: > 1、如果我们指定了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期; > 2、如果我们未指定锁的超时时间,就使用 `lockWatchdogTimeout = 30 * 1000` 【看门狗默认时间】 改造程序: ![img](assets/day06/wps21.jpg) 重启后在浏览器测试: ### 2.4.3 读写锁(ReadWriteLock) 基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。 分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。 ```java RReadWriteLock rwlock = Redisson.getReadWriteLock("anyRWLock"); // 最常见的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock(); ``` 代码实现 TestController ```java /** * 读数据接口 * @return */ @GetMapping("/read") public Result read(){ String msg = testService.read(); return Result.ok(msg); } /** * 写数据接口 * @return */ @GetMapping("/write") public Result write(){ testService.write(); return Result.ok("写入数据成功"); } ``` TestService接口 ```java /** * 读数据接口 * @return */ String read(); /** * 写数据接口 * @return */ void write(); ``` TestServiceImpl实现类 ```java /** * 从Redis中读数据 * * @return */ @Override public String read() { //1.创建读写锁对象 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock"); //2.获取读锁 RLock lock = readWriteLock.readLock(); //加锁 给锁的有效期设置为10s lock.lock(10, TimeUnit.SECONDS); //3.执行从redis获取数据业务 String msg = stringRedisTemplate.opsForValue().get("msg"); //4.释放读锁 //lock.unlock(); return msg; } /** * 将数据写入redis * * @return */ @Override public void write() { //1.创建读写锁对象 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock"); //2.获取写锁 RLock lock = readWriteLock.writeLock(); //加锁 给锁的有效期设置为10s lock.lock(10, TimeUnit.SECONDS); //3.业务将数据写入redis stringRedisTemplate.opsForValue().set("msg", "msgData"); //4.释放写锁 //lock.unlock(); } ``` 打开两个浏览器窗口测试: http://localhost:8206/admin/product/test/read http://localhost:8206/admin/product/test/write - 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始 - 同时访问读:不用等待 - 先写后读:读要等待(约10s)写完成 - 先读后写:写要等待(约10s)读完成 # 3、分布式锁改造获取sku信息 ### 3.1 使用Redis实现分布式锁 RedisConst 类中追加一个变量 ```java // 商品如果在数据库中不存在那么会缓存一个空对象进去,但是这个对象是没有用的,所以这个对象的过期时间应该不能太长, // 如果太长会占用内存。 // 定义变量,记录空对象的缓存过期时间 public static final long SKUKEY_TEMPORARY_TIMEOUT = 10 * 60; ``` **在实现类中引入** ```java @Autowired private RedisTemplate redisTemplate; /** * 根据SkuID查询SKU商品信息包含图片列表 * * @param skuId * @return */ @Override public SkuInfo getSkuInfo(Long skuId) { //改造前从数据库直接查询 //SkuInfo skuInfo = getSkuInfoDB(skuId); //改造1:缓存+分布式锁(RedisTemplate) SkuInfo skuInfo = getSkuInfoRedis(skuId); return skuInfo; } /** * 采用SpringDataRedis提供实现分布锁 * 优先从缓存中获取商品信息,缓存未命中,避免出现缓存击穿,分布式锁 * * @param skuId * @return */ private SkuInfo getSkuInfoRedis(Long skuId) { try { //1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿 //1.1 构建商品详情key-缓存商品信息Key 形式: sku:29:info String skuKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX; //1.2 根据key查询缓存中商品数据 SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(skuKey); if (skuInfo == null) { //2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存) //2.1 为每个查询商品构建商品SKU锁Key值 形式:sku:29:lock sku:30:lock String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX; //2.2 构建锁的值UUID String uuid = UUID.randomUUID().toString().replace("-", ""); //2.3 调用redis执行set key val ex 10 nx 获取锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, RedisConst.SKULOCK_EXPIRE_PX1, TimeUnit.SECONDS); //2.3.1 获取锁成功,执行业务(查询数据库,放入缓存) if (flag) { //2.4 执行业务:查询数据库获取商品信息 skuInfo = this.getSkuInfoDB(skuId); //2.4.1 数据库中本身不存在 短时间缓存空对象,返回空对象 if (skuInfo == null) { redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS); return skuInfo; } //2.4.2 数据库中有数据,将数据加入缓存 redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS); //2.5 将锁释放掉 保证删除原子性采用lua脚本 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); redisScript.setResultType(Long.class); redisTemplate.execute(redisScript, Arrays.asList(lockKey), uuid); return skuInfo; } else { //2.3.2 获取锁失败,自旋等待下次获取 Thread.sleep(100); return this.getSkuInfoRedis(skuId); } } else { //命中缓存直接返回即可 return skuInfo; } } catch (Exception e) { e.printStackTrace(); } //兜底方案:查询数据库 return getSkuInfoDB(skuId); } ``` ### 3.2 使用Redisson实现分布式锁 **在实现类添加** ```java /** * 根据SkuID查询SKU商品信息包含图片列表 * * @param skuId * @return */ @Override public SkuInfo getSkuInfo(Long skuId) { //改造前从数据库直接查询 //SkuInfo skuInfo = getSkuInfoDB(skuId); //改造后:缓存+分布式锁(RedisTemplate方式) //SkuInfo skuInfo = getSkuInfoRedis(skuId); //改造后:缓存+分布式锁(Redisson方式) SkuInfo skuInfo = getSkuInfoRedisson(skuId); return skuInfo; } /** * 采用Redisson提供实现分布锁 * 优先从缓存中获取商品信息,缓存未命中,避免出现缓存击穿,分布式锁 * * @param skuId * @return */ private SkuInfo getSkuInfoRedisson(Long skuId) { try { //1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿 //1.1 构建商品商品信息Key 形式:sku:29:info String skuKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX; //1.2 从缓存中获取商品信息 结果 命中缓存:直接返回 未命中 执行第二步 SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(skuKey); if (skuInfo == null) { //2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存) //2.1 为每个商品声明锁的Key 形式:sku:29:lock String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX; //2.2创建可重入锁对象 RLock lock = redissonClient.getLock(lockKey); //2.3尝试获取锁 结果 获取锁成功:执行业务(查询数据库 有值:放入缓存 没值:缓存空对象) 释放锁 boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS); //2.3.1 获取锁成功 查库缓存 释放锁 if (flag) { try { //2.3.2 查询数据库 skuInfo = this.getSkuInfoDB(skuId); //2.3.2.1 没值 缓存空对象 if (skuInfo == null) { redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS); return skuInfo; } //2.3.2.2 有值:将数据放入缓存 redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { //2.3.3 释放锁 lock.unlock(); } return skuInfo; } else { //2.2.2 获取锁失败 自旋下次重试 Thread.sleep(200); return this.getSkuInfoRedisson(skuId); } } else { //命中缓存则直接返回 return skuInfo; } } catch (Exception e) { e.printStackTrace(); } //兜底方案:查询数据库 return this.getSkuInfoDB(skuId); } ``` ### 3.3 在getSkuInfo 中调用上述两个方法进行测试 ```java /** * 根据SkuID查询SKU商品信息包含图片列表 * * @param skuId * @return */ @Override public SkuInfo getSkuInfo(Long skuId) { //改造前从数据库直接查询 //SkuInfo skuInfo = getSkuInfoDB(skuId); //改造后:缓存+分布式锁(RedisTemplate方式) //SkuInfo skuInfo = getSkuInfoRedis(skuId); //改造后:缓存+分布式锁(Redisson方式) SkuInfo skuInfo = getSkuInfoRedisson(skuId); return skuInfo; } ``` # 4、分布式锁 + AOP实现缓存 随着业务中缓存及分布式锁的加入,业务代码变的复杂起来,除了需要考虑业务逻辑本身,还要考虑缓存及分布式锁的问题,增加了程序员的工作量及开发难度。而缓存的玩法套路特别类似于事务,而声明式事务就是用了aop的思想实现的。 ![img](assets/day06/wps22.jpg) 1. 以 @Transactional 注解为植入点的切点,这样才能知道@Transactional注解标注的方法需要被代理。 2. @Transactional注解的切面逻辑类似于@Around 模拟事务,缓存可以这样实现: 自定义缓存注解@GmallCache(类似于事务@Transactional) 2. 编写切面类,使用环绕通知实现缓存的逻辑封装 ![img](assets/day06/wps23.jpg) ### 4.1 定义一个注解 在`service-util`模块中定义缓存注解 ```java package com.atguigu.gmall.common.cache; import java.lang.annotation.*; /** * 自定义注解 * 作用:优先从缓存中获取数据+避免缓存击穿采用分布式锁 * 用来修饰注解的注解称为:元注解 * 1.Target 注解可以被修饰位置 TYPE:类 METHOD:方法 FIELD:属性 * 2.Retention 注解生命周期 * 3.Inherited 注解是否可以被继承 * 4.Documented 产生文档 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface GmallCache { /** * 缓存数据前缀 * * @return */ String prefix() default "cache:"; /** * 缓存数据后缀 * * @return */ String suffix() default ":info"; } ``` ### 4.2 定义一个切面类加上注解 Spring aop 参考文档: https://docs.spring.io/spring-framework/docs/5.3.9-SNAPSHOT/reference/html/core.html#aop ![img](assets/day06/wps24.jpg) ![img](assets/day06/wps25.jpg) ```java package com.atguigu.gmall.common.cache; import com.alibaba.fastjson.JSON; import com.atguigu.gmall.common.constant.RedisConst; import com.sun.org.apache.regexp.internal.RE; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * 缓存+分布式锁切面类 * * @author: atguigu * @create: 2023-01-03 09:33 */ @Component @Aspect @Slf4j public class GmallCacheAspect { @Autowired private RedisTemplate redisTemplate; @Autowired private RedissonClient redissonClient; /** * 缓存+分布式锁切面逻辑 * * @param joinPoint 切入点(真正被增强位置)包含目标方法各种信息 * @return */ @SneakyThrows //lombok提供处理异常try catch @Around("@annotation(com.atguigu.gmall.common.cache.GmallCache)") public Object cacheAroundAdvice(ProceedingJoinPoint joinPoint) { Object resultObject = null; //一. 前置通知 try { log.info("前置通知"); //1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿 //1.1 声明缓存数据key 形式:前缀+数据ID+后缀 //1.1.1 通过切入点对象joinPoint获取相关注解 MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); //1.1.2 获取方法对象,从方法上获取注解 从注解中获取属性值 GmallCache gmallCache = methodSignature.getMethod().getAnnotation(GmallCache.class); //1.1.2 从切入点对象中获取目标方法参数 Object[] args = joinPoint.getArgs(); String argsStr = Arrays.asList(args).stream().map(arg -> { return arg.toString(); //将切入点方法如果存在多参数用|拼接 }).collect(Collectors.joining("|")); String dataKey = gmallCache.prefix() + argsStr + gmallCache.suffix(); //1.2 从缓存中获取数据 结果 命中:直接返回 未命中:执行2.获取锁方法 resultObject = this.cacheHit(dataKey, methodSignature); if (resultObject == null) { //2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存) //2.1 为每个业务数据声明锁的Key 形式:前缀+数据ID+后缀 String lockKey = gmallCache.prefix() + argsStr + RedisConst.SKULOCK_SUFFIX; //2.2 创建可重入锁对象 RLock lock = redissonClient.getLock(lockKey); //2.3 尝试获取锁 结果 获取到锁:执行目标方法(查询数据)将查询结果加入缓存 没有获取到锁:自旋 boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS); if (flag) { try { //二. 执行目标方法 //2.3.1 获取锁成功 执行查询数据 将数据放入缓存 释放锁 resultObject = joinPoint.proceed(joinPoint.getArgs()); if (resultObject == null) { //2.3.1.1 库中不存在数据,缓存空对象,返回空对象 redisTemplate.opsForValue().set(dataKey, JSON.toJSONString(resultObject), RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS); return resultObject; } else { //2.3.1.2 将数据放入缓存 释放锁 返回数据 redisTemplate.opsForValue().set(dataKey, JSON.toJSONString(resultObject), RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS); return resultObject; } } finally { //2.3.2 释放锁 lock.unlock(); } } else { //2.3.2 没有获取到锁:自旋 Thread.sleep(100); return this.cacheAroundAdvice(joinPoint); } //2.4 将锁释放 } else { // 命中缓存直接返回即可 return resultObject; } } catch (Throwable throwable) { throwable.printStackTrace(); } //三.后置通知 log.info("后置通知"); return joinPoint.proceed(joinPoint.getArgs()); } /** * 从Redis缓存中获取数据 * * @param dataKey 缓存业务数据key * @param methodSignature 切入点方法 */ private Object cacheHit(String dataKey, MethodSignature methodSignature) { //1.直接从redis中获取业务数据 缓存中无论是List 对象 都存JSON字符串 可能:SkuInfo List String dataStr = (String) redisTemplate.opsForValue().get(dataKey); if (StringUtils.isNotBlank(dataStr)) { //2.对缓存结果进行判断 //2.1命中缓存 将JSON字符串 反序列化为对应类型 Class returnType = methodSignature.getReturnType(); return JSON.parseObject(dataStr, returnType); } //2.2没有命中 return null; } } ``` ### 4.3 使用注解完成缓存 ```java /** * 根据SkuID查询SKU商品信息包含图片列表 * * @param skuId * @return */ @Override @GmallCache(prefix = "sku:", suffix = ":info") public SkuInfo getSkuInfo(Long skuId) { return getSkuInfoDB(skuId); } /** * 通过三级分类id查询分类信息 * @param category3Id * @return */ @Override @GmallCache(prefix = "categoryView:") public BaseCategoryView getCategoryViewByCategory3Id(Long category3Id) { .... } /** * 根据商品SPUID查询商品海报列表 * * @param spuId 商品SPUID * @return */ @Override @GmallCache(prefix = "spuPosterBySpuId:") public List findSpuPosterBySpuId(Long spuId) { // select * from spu_poster where spu_id = spuId; return spuPosterMapper.selectList(new QueryWrapper().eq("spu_id",spuId)); } /** * 根据SKUID查询当前SKU商品中平台属性以及属性值 * * @param skuId * @return */ @Override @GmallCache(prefix = "attrList:") public List getAttrList(Long skuId) { /** * 根据SPU_ID,SKU_ID查询所有销售属性,标识SKU选中销售属性 * * @param skuId * @param spuId * @return */ @Override @GmallCache(prefix = "spuSaleAttrListCheckBySku:") public List getSpuSaleAttrListCheckBySku(Long skuId, Long spuId) { .... } /** * 返回所有销售属性对应skuID map集合 * * @param spuId * @return */ @Override @GmallCache(prefix = "skuValueIdsMap:") public Map getSkuValueIdsMap(Long spuId) { .... } /** * 根据商品实时最新价格 * * @param skuId 商品ID * @return */ @Override public BigDecimal getSkuPrice(Long skuId) { //1.避免出现缓存击穿 //1.1 构建锁的key String lockKey = "sku:price:" + skuId + ":lock"; //1.2 创建锁对象 RLock lock = redissonClient.getLock(lockKey); try { //1.3 获取锁 lock.lock(); //select price from sku_info where id = 29; LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); //设置查询条件 queryWrapper.eq(SkuInfo::getId, skuId); //设置查询字段 queryWrapper.select(SkuInfo::getPrice); SkuInfo skuInfo = skuInfoMapper.selectOne(queryWrapper); if (skuInfo != null) { return skuInfo.getPrice(); } return new BigDecimal("0"); } finally { //1.4 释放锁 lock.unlock(); } } ``` # 5、布隆过滤器 ## 5.1 布隆过滤器原理 ### 5.1.1 什么是布隆过滤器 布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的[二进制](https://baike.baidu.com/item/二进制/361457)向量和一系列随机映射函数。布隆过滤器可以用于[检索](https://baike.baidu.com/item/检索/11003896)一个元素是否在一个[集合](https://baike.baidu.com/item/集合/2908117)中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。 主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。 总结: 判断一个元素一定不存在 或者 可能存在! 存在一定的误判率{通过代码调节} Bit 数组: | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | | | | | | | | | | ### 5.1.2 布隆过滤器使用场景 大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题 ### 5.1.3 原理 存入过程 布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下: 通过K个哈希函数计算该数据,返回K个计算出的hash值 这些K个hash值映射到对应的K个二进制的数组下标 将K个下标对应的二进制数据改成1。 如图所示: ![img](assets/day06/wps26.jpg) 查询过程 布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下: 1、通过K个哈希函数计算该数据,对应计算出的K个hash值 2、通过hash值找到对应的二进制的数组下标 3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据**可能**存在集合中。 ### 5.1.4 布隆过滤器的优缺点 优点 1. 由于存储的是二进制数据,所以占用的空间很小 2. 它的插入和查询速度是非常快的,时间复杂度是O(K),空间复杂度:O (M)。 - K: 是哈希函数的个数 - M: 是二进制位的个数 3. 保密性很好,因为本身不存储任何原始数据,只有二进制数据 缺点 1. 添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。 ![img](assets/day06/wps27.jpg) 例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。 这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。 由此得出如下缺点: 一、存在误判 假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。 因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。 误判率: ​ 受三个因素影响: 二进制位的个数m, 哈希函数的个数k, 数据规模n (添加到布隆过滤器中的函数) ![img](assets/day06/wps28.jpg) 已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了} ![img](assets/day06/wps29.jpg) ln: 自然对数是以常数e为[底数](https://baike.baidu.com/item/底数/5416651)的[对数](https://baike.baidu.com/item/对数/91326),记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。 二、删除困难 还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。 如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。 那么你是不是连“张三丰”都一起删了呀。 ## 5.2 实现方式 ### 5.2.1 初始化skuId的布隆过滤器 RedisConst 常量类中添加,使用Redis中数据结构:bitmap ```java // 布隆过滤器使用! public static final String SKU_BLOOM_FILTER="sku:bloom:filter"; ``` 操作模块:`service-product` 修改启动类 ```java package com.atguigu.gmall; import com.atguigu.gmall.common.constant.RedisConst; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.annotation.ComponentScan; /** * @author: atguigu * @create: 2022-12-21 14:39 */ @SpringBootApplication @EnableDiscoveryClient //@ComponentScan(basePackages = {"com.xxx"}) public class ProductApp implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ProductApp.class, args); } @Autowired private RedissonClient redissonClient; /** * Springboot应用初始化后会执行一次该方法 * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { //初始化布隆过滤器 RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER); //设置数据规模 误判率 预计统计元素数量为100000,期望误差率为0.01 bloomFilter.tryInit(100000, 0.01); } } ``` SpringBoot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次。 ### 5.2.2 给商品详情页添加布隆过滤器 1、查看商品详情页添加布隆过滤器 操作模块:`service-item` 更改ItemServiceImpl.getBySkuId方法 ```java @Autowired private RedissonClient redissonClient; /** * 远程调用商品微服务:根据skuID汇总sku商品详情页所有数据 * * @param skuId * @return */ @Override public Map getBySkuId(Long skuId) { //0.创建响应结果Map Map result = new HashMap<>(); //远程调用商品微服务7个接口之前 提前知道用户访问商品SKUID是否存在与布隆过滤器 RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER); if (!bloomFilter.contains(skuId)) { log.error("用户查询商品sku不存在:{}", skuId); //查询数据不存在直接返回空对象 return result; } ... } ``` 2、添加商品sku加入布隆过滤器数据 操作模块:`service-product` 更改SkuManageServiceImpl.saveSkuInfo方法 ```java /** * 保存SKU信息 * 1.将SKU基本信息存入sku_info表中 * 2.将提交SKU图片存入sku_image表 关联SKU 设置sku_id逻辑外键 * 3.将提交的平台属性列表 批量保存 sku_attr_value 关联SKU 设置sku_id逻辑外键 * 4.将提交的销售属性列表 批量保存 sku_sale_attr_value 关联SKU 设置sku_id逻辑外键 * * @param skuInfo SKU相关信息 */ public void saveSkuInfo(SkuInfo skuInfo) { /* 将数据插入数据库表! skuInfo: 库存单元表! 记录是哪个spu下的sku! skuImage: 库存单元图片表! skuSaleAttrValue; sku 与 销售属性值中间表! skuAttrValue: sku 与平台属性值中间表! */ skuInfoMapper.insert(skuInfo); ... //将新增的商品SKUID存入布隆过滤器 //5. 获取布隆过滤器,将新增skuID存入布隆过滤器 RBloomFilter bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER); bloomFilter.add(skuInfo.getId()); } ```