第6章-商品详情进阶优化.md 54 KB

第6章-商品详情进阶优化

学习目标:

  • 商品详情页优化方案
  • 掌握Redis实现分布式锁方式
  • 掌握Redisson实现Redis分布式锁
  • 分布式锁+AOP实现缓存
  • 布隆过滤器的使用场景及应用

1、商品详情页面优化

1.1 思路

虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。

一般分为两个层面:

  • 一是提高数据库sql本身的性能
  • 二是尽量避免直接查询数据库。

重点要讲的是另外一个层面:尽量避免直接查询数据库。

解决办法就是:缓存

1.2 整合Redis到工程

由于Redis作为缓存数据库,要被多个项目使用,所以要制作一个通用的工具类,方便工程中的各个模块使用。

而主要使用Redis的模块,都是后台服务的模块,service工程。所以咱们把Redis的工具类放到service-util模块中,这样所有的后台服务模块都可以使用Redis。

1.2.1 首先在service-util引入依赖包

搭建环境时,已经导入.

<!-- Redis -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成Redis所需common-pool2-->
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-pool2</artifactId>
   <version>2.6.0</version>
</dependency>

1.2.2 添加Redis配置类

service-util配置自定义RedisTemplate对象-设置Key,Val的序列化方式

package com.atguigu.gmall.common.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig {

    @Primary
    @Bean
    public RedisTemplate<Object, Object> RedisTemplate(RedisConnectionFactory RedisConnectionFactory) {
        RedisTemplate<Object, Object> RedisTemplate = new RedisTemplate<>();
        RedisTemplate.setConnectionFactory(RedisConnectionFactory);

        //使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化) 对存储的对象进行JSON序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 序列化key value
        RedisTemplate.setKeySerializer(new StringRedisSerializer());
        RedisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        RedisTemplate.setHashKeySerializer(new StringRedisSerializer());
        RedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        RedisTemplate.afterPropertiesSet();
        return RedisTemplate;
    }
}

image-20221228191445639

说明:由于service-util属于公共模块,所以我们把它引入到service父模块,其他service子模块都自动引入了

1.3 Redis业务开发相关规则

开始开发先说明Redis key的命名规范,由于Redis不像数据库表那样有结构,其所有的数据全靠key进行索引,所以Redis数据的可读性,全依靠key。

企业中最常用的方式就是:

object:id:field

比如:sku:1314:info

​ user:1092:info

: 表示根据windows的 / 一个意思

在RedisConst中定义Redis的常量,RedisConst类在service-util模块中,所有的Redis常量我们都配置在这里

package com.atguigu.gmall.common.constant;

/**
 * Redis常量配置类
 *
 */
public class RedisConst {

    public static final String SKUKEY_PREFIX = "sku:";
    public static final String SKUKEY_SUFFIX = ":info";
    //单位:秒
    public static final long SKUKEY_TIMEOUT = 24 * 60 * 60;

}

1.4 缓存常见问题

缓存最常见的3个问题: 面试

  1. 缓存穿透

  2. 缓存雪崩

  3. 缓存击穿

缓存穿透: 是指查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。

  • 解决1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透。

  • 解决2 :使用布隆过滤器来解决随机穿透问题。

img

缓存雪崩:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

  • 解决1:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

  • 解决2:如果单节点宕机,可以采用集群部署方式防止雪崩

缓存击穿: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。

与缓存雪崩的区别:

  1. 击穿是一个热点key失效

  2. 雪崩是很多key集体失效

  • 解决:锁

img

2、分布式锁

2.1 本地锁的局限性

之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题

2.1.1 编写测试代码

service-product中新建TestController中添加测试方法

package com.atguigu.gmall.product.controller;


@Api(tags = "测试接口")
@RestController
@RequestMapping("admin/product/test")
public class TestController {
    
    @Autowired
    private TestService testService;

    @GetMapping("testLock")
    public Result testLock() {
        testService.testLock();
        return Result.ok();
    }
}

业务接口

package com.atguigu.gmall.product.service;

public interface TestService {

   void testLock();

}

业务实现类

package com.atguigu.gmall.product.service.impl;
@Service
public class TestServiceImpl implements TestService {

   @Autowired
   private StringRedisTemplate redisTemplate;
   @Override
   public void testLock() {
      // 查询Redis中的num值
      String value = (String)this.redisTemplate.opsForValue().get("num");
      // 没有该值return
      if (StringUtils.isBlank(value)){
         return ;
      }
      // 有值就转成成int
      int num = Integer.parseInt(value);
      // 把Redis中的num值+1
      this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
   }
}

说明:通过reids客户端设置num=0

2.1.2. 使用ab工具测试

登录虚拟机192.168.200.128,命令行中使用ab测试工具:httpd-tools(yum install -y httpd-tools)已安装

命令语法:ab -n(一次发送的请求数) -c(请求的并发数) 访问路径

测试如下:5000请求,100并发 注意:关闭本地Windows防火墙

ab  -n 5000 -c 100 http://192.168.200.1:8206/admin/product/test/testLock

img

查看Redis中的值:

img

2.1.3. 使用本地锁

@Override
public synchronized void testLock() {
   // 查询Redis中的num值
   String value = (String)this.redisTemplate.opsForValue().get("num");
   // 没有该值return
   if (StringUtils.isBlank(value)){
      return ;
   }
   // 有值就转成成int
   int num = Integer.parseInt(value);
   // 把Redis中的num值+1
   this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
}

使用ab工具压力测试:5000次请求,并发100

img

查看Redis中的结果:

img

完美!与预期一致,是否真的完美?

接下来再看集群情况下,会怎样?

2.1.4. 本地锁问题演示锁

接下来启动8206 8216 8226 三个运行实例,运行多个service-product实例:

server.port=8216

server.port=8226

img

注意:bootstrap.properties 添加一个server.port = 8206; 将nacos的配置注释掉!

通过网关压力测试:

启动网关:

ab -n 5000 -c 100 http://192.168.200.1/admin/product/test/testLock

img

查看Redis中的值:

img

集群情况下又出问题了!!!

以上测试,可以发现:

本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性。

此时需要分布式锁。。

2.2 分布式锁实现的解决方案

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁

  2. 基于缓存( Redis等)

  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:Redis最高

  2. 可靠性:zookeeper最高

因为Redis具备高性能、高可用、高并发的特性,这里,我们就基于Redis实现分布式锁。

分布式锁的关键是多进程共享的内存标记(锁),因此只要我们在Redis中放置一个这样的标记(数据)就可以了。不过在实现过程中,不要忘了我们需要实现下列目标:

  • 多进程可见:多进程可见,否则就无法实现分布式效果

  • 避免死锁:死锁的情况有很多,我们要思考各种异常导致死锁的情况,保证锁可以被释放

尝试获取锁

成功:执行业务代码 执行业务 try(){业务代码-宕机} catch() finally{ 释放锁}

失败:等待;失效;下次

  • 排它:同一时刻,只能有一个进程获得锁

  • 高可用:避免锁服务宕机或处理好宕机的补救措施(redis集群架构:1.主从复制 2.哨兵 3.cluster集群)

2.3 使用Redis实现分布式锁

img

  1. 多个客户端同时获取锁(setnx)

  2. 获取成功,执行业务逻辑{从db获取数据,放入缓存,执行完成释放锁(del)

  3. 其他客户端等待重试

2.3.1. 编写代码

/**
 * 采用SpringDataRedis实现分布式锁
 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
 */
@Override
public void testLock() {

    //0.先尝试获取锁 setnx key val
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放
        stringRedisTemplate.delete("lock");

    }else{
        try {
            Thread.sleep(100);
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

重启,服务集群,通过网关压力测试:

img

查看Redis中num的值:

img

基本实现。

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

解决:设置过期时间,自动释放锁。

2.3.2. 优化之设置锁的过期时间

设置过期时间有两种方式:

  1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  2. 在set时指定过期时间(推荐)

img

设置过期时间:

img

压力测试肯定也没有问题。自行测试

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑

  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

2.3.3. 优化之UUID防误删

img

img

问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等

img

  1. index1执行删除前,lock刚好过期时间已到,被Redis自动释放

在Redis中没有了锁。

img

  1. index2获取了lock,index2线程获取到了cpu的资源,开始执行方法

  2. index1执行删除,此时会把index2的lock删除

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行

img

删除的index2的锁!

2.3.4. 优化之LUA脚本保证删除的原子性

/**
 * 采用SpringDataRedis实现分布式锁
 * 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
 */
@Override
public void testLock() {

    //0.先尝试获取锁 setnx key val
    //问题:锁可能存在线程间相互释放
    //Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);
    //解决:锁值设置为uuid
    String uuid = UUID.randomUUID().toString();
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);

    if(flag){
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = stringRedisTemplate.opsForValue().get("num");
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

        //4.将锁释放 判断uuid
        //问题:删除操作缺乏原子性。
        //if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值
        //    //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除
        //    stringRedisTemplate.delete("lock");
        //}
        //解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行

        //执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i])  参数三:lua脚本中需要参数值 ARGV[i]
        //4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        //4.2设置脚本文本
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                "then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        redisScript.setScriptText(script);
        //4.3 设置响应类型
        redisScript.setResultType(Long.class);
        stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
    }else{
        try {
            //睡眠
            Thread.sleep(100);
            //自旋重试
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

问题:

Redis集群状态下的问题:

  1. 客户端A从master获取到锁

  2. 在master将锁同步到slave之前,master宕掉了。

  3. slave节点被晋级为master节点

  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。

安全失效!

解决方案:了解即可!

img

2.3.5. 总结

1、加锁

// 1. 从Redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
      .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);

2、使用lua释放锁

// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);

3、重试

Thread.sleep(500); 
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
  • 加锁和解锁必须具有原子性

2.4 使用Redisson 解决分布式锁

​ Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

官方文档地址:https://github.com/Redisson/Redisson/wiki

Github 地址:https://github.com/Redisson/Redisson

2.4.1 实现代码

  1. service-util模块导入依赖(已导入)

    <!-- Redisson -->
    <dependency>
      <groupId>org.Redisson</groupId>
      <artifactId>Redisson</artifactId>
      <version>3.15.3</version>
    </dependency>
    
  2. 配置Redisson客户端对象

    package com.atguigu.gmall.common.config;
       
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.redisson.config.SingleServerConfig;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.util.StringUtils;
       
       
    @Configuration
    public class RedissonConfig {
       
       private static String ADDRESS_PREFIX = "redis://%s:%d";
       private int timeout = 3000;
       
       
       @Bean
       public RedissonClient redissonClient(RedisProperties prop) {
           if (StringUtils.isEmpty(prop.getHost())) {
               throw new RuntimeException("host is  empty");
           }
           Config config = new Config();
           SingleServerConfig singleServerConfig = config.useSingleServer()
                   .setAddress(String.format(ADDRESS_PREFIX, prop.getHost(), prop.getPort()))
                   .setTimeout(timeout);
           if (!StringUtils.isEmpty(prop.getPassword())) {
               singleServerConfig.setPassword(prop.getPassword());
           }
           return Redisson.create(config);
       }
    }
    

注意:这里读取了一个名为RedisProperties的属性,因为我们引入了SpringDataRedis,Spring已经自动加载了RedisProperties,并且读取了配置文件中的Redis信息。

  1. 修改实现类

    @Autowired
    private RedissonClient redissonClient;
       
    /**
    * 采用Redisson实现分布式锁
    *
    */
    @Override
    public void testLock() {
       //0.1 创建锁对象
       RLock lock = redissonClient.getLock("lock");
       
       //0.2 获取锁
       //0.2.1 一直等待到获取锁,如果获取锁成功,锁的有效时间为:30s,底层启动"看门狗"线程(如果业务有超时风险)可以延迟锁有效时间
       lock.lock();
       
       //0.2.2 一直等待到获取锁,如果获取锁成功,自定义锁有效时间
       //lock.lock(10, TimeUnit.SECONDS);
       
       //0.2.3 尝试获取锁 参数1:等待获取锁时间,超时则放弃获取  参数2:如果获取锁成功,锁的有效时间 参数3:时间单位
       //boolean b = lock.tryLock(3, 10, TimeUnit.SECONDS);
       try {
           //1.从Redis缓存中获取key="num"的值  保证redis中存在"num"(手动提前在redis中创建key)
           String value = stringRedisTemplate.opsForValue().get("num");
           if (StringUtils.isBlank(value)) {
               return;
           }
           //2.对获取到值进行+1操作
           int num = Integer.parseInt(value);
           stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
       
       } catch (NumberFormatException e) {
           e.printStackTrace();
       } finally {
           //3. 释放锁
           lock.unlock();
       }
    }
    

2.4.2 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

快速入门使用的就是可重入锁。也是最常使用的锁。

最常见的使用:

@Autowired
private RedissonClient redissonClient;

/**
 * 使用Redison实现分布式锁
 * 开发步骤:
 * 1.使用RedissonClient客户端对象 创建锁对象
 * 2.调用获取锁方法
 * 3.执行业务逻辑
 * 4.将锁释放
 */
public void testLock() {

    //0.创建锁对象
    RLock lock = redissonClient.getLock("lock1");

    try {
        //0.1 尝试加锁
        //0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30s
        //lock.lock();
        //0.1.2 lock(数字,时间单位) 指定获取锁成功有效期,直到获取锁成功。到期自动释放
        //lock.lock(10, TimeUnit.SECONDS);
        //0.1.3 tryLock(等待获取锁时间,锁的有效期,时间单位) 指定时间内如果获取锁成功,返回true 执行后续业务,如果超过等待时间,返回false
        boolean flag = lock.tryLock(2, 10, TimeUnit.SECONDS);
        if (flag) {
            System.out.println(Thread.currentThread().getName() + "线程获取锁成功");
            //获取成功 执行业务
            //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
            String value = stringRedisTemplate.opsForValue().get("num");
            //2.如果值为空则非法直接返回即可
            if (StringUtils.isBlank(value)) {
                return;
            }
            //3.对num值进行自增加一
            int num = Integer.parseInt(value);
            stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));

            //测试可重入锁
            this.check();

            //4.将锁释放
            lock.unlock();
        } else {
            //本地获取锁失败
            Thread.sleep(100);
            this.testLock();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
        //如果执行异常,释放锁
        lock.unlock();
    }
}


/**
 * 测试可重入
 */
private void check() {
    //尝试获取锁
    RLock lock = redissonClient.getLock("lock1");
    lock.lock();

    //执行业务
    System.out.println(Thread.currentThread().getName() + "可重入获取成功");

    //释放锁
    lock.unlock();
}

看门狗原理:

1、如果我们指定了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期; 2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】

改造程序:

img

重启后在浏览器测试:

2.4.3 读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。

分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = Redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

代码实现

TestController

/**
 * 读数据接口
 * @return
 */
@GetMapping("/read")
public Result read(){
    String msg = testService.read();
    return Result.ok(msg);
}

/**
 * 写数据接口
 * @return
 */
@GetMapping("/write")
public Result write(){
    testService.write();
    return Result.ok("写入数据成功");
}

TestService接口

/**
 * 读数据接口
 * @return
 */
String read();

/**
 * 写数据接口
 * @return
 */
void write();

TestServiceImpl实现类

/**
 * 从Redis中读数据
 *
 * @return
 */
@Override
public String read() {
    //1.创建读写锁对象
    RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
    //2.获取读锁
    RLock lock = readWriteLock.readLock();
    //加锁 给锁的有效期设置为10s
    lock.lock(10, TimeUnit.SECONDS);

    //3.执行从redis获取数据业务
    String msg = stringRedisTemplate.opsForValue().get("msg");

    //4.释放读锁
    //lock.unlock();
    return msg;
}

/**
 * 将数据写入redis
 *
 * @return
 */
@Override
public void write() {
    //1.创建读写锁对象
    RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
    //2.获取写锁
    RLock lock = readWriteLock.writeLock();
    //加锁 给锁的有效期设置为10s
    lock.lock(10, TimeUnit.SECONDS);

    //3.业务将数据写入redis
    stringRedisTemplate.opsForValue().set("msg", "msgData");

    //4.释放写锁
    //lock.unlock();

}

打开两个浏览器窗口测试:

http://localhost:8206/admin/product/test/read

http://localhost:8206/admin/product/test/write

  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始

  • 同时访问读:不用等待

  • 先写后读:读要等待(约10s)写完成

  • 先读后写:写要等待(约10s)读完成

3、分布式锁改造获取sku信息

3.1 使用Redis实现分布式锁

RedisConst 类中追加一个变量

// 商品如果在数据库中不存在那么会缓存一个空对象进去,但是这个对象是没有用的,所以这个对象的过期时间应该不能太长,
// 如果太长会占用内存。
// 定义变量,记录空对象的缓存过期时间
public static final long SKUKEY_TEMPORARY_TIMEOUT = 10 * 60;

在实现类中引入

/**
 * 查询商品信息优化 采用SDR-SprignDataRedis实现分布式锁
 * 1.优先从缓存中获取数据
 * 2.获取分布式锁
 * 3.执行查询数据库逻辑
 *
 * @param skuId
 * @return
 */
@Override
public SkuInfo getSkuInfoAndImages(Long skuId) {
    try {
        String lockKey = "";
        String lockVal = "";
        //1.优先从Redis分布式缓存中获取业务数据 存在:直接返回  不存在:获取分布式锁
        //1.1 构建商品信息业务数据缓存Key 形式--  sku:24:info
        String dataKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;
        //1.2 通过业务Key从分布式缓存中获取业务数据
        SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(dataKey);
        if (skuInfo == null) {
            try {
                //2.尝试获取分布式锁  成功:执行查询数据库逻辑,放入缓存返回 失败:自旋
                lockKey = "lock";
                //2.1 构建当前商品锁值 UUID
                lockVal = UUID.randomUUID().toString().replaceAll("-", "");
                //2.2 调用set ex nx 获取锁
                Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockVal, RedisConst.SKULOCK_EXPIRE_PX1, TimeUnit.SECONDS);
                if (flag) {
                    //2.3 执行业务逻辑-从数据库中查询商品信息
                    skuInfo = this.getSkuInfoAndImagesFromDB(skuId);
                    //2.3.1 查询数据库获取业务数据  有值:放入缓存,响应业务数据  没有:null放入缓存(暂存)
                    if (skuInfo == null) {
                        redisTemplate.opsForValue().set(dataKey, skuId, RedisConst.SKUKEY_TIMEOUT, TimeUnit.MILLISECONDS);
                        return skuInfo;
                    }
                    //3.3.2 数据库中有数据,将业务数据放入缓存,返回业务数据
                    redisTemplate.opsForValue().set(dataKey, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
                    return skuInfo;
                } else {
                    try {
                        //本次获取锁失败-自旋
                        Thread.sleep(100);
                        this.getSkuInfoAndImages(skuId);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //3.将锁释放 采用lua脚本释放锁
                String scriptText = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                        "then\n" +
                        "    return redis.call(\"del\",KEYS[1])\n" +
                        "else\n" +
                        "    return 0\n" +
                        "end";
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(scriptText);
                redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockVal);
            }
        } else {
            //缓存中业务数据直接返回即可
            return skuInfo;
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    //兜底处理-查询数据库
    return this.getSkuInfoAndImagesFromDB(skuId);
}


/**
 * 根据SkuID查询sku信息以及图片
 *
 * @param skuId
 * @return
 */
public SkuInfo getSkuInfoAndImagesFromDB(Long skuId) {
    //1.根据主键ID查询商品Sku信息
    SkuInfo skuInfo = skuInfoService.getById(skuId);
    //2.根据SKuID查询商品SKU的图片列表
    if (skuInfo != null) {
        LambdaQueryWrapper<SkuImage> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SkuImage::getSkuId, skuId);
        List<SkuImage> skuImageList = skuImageService.list(queryWrapper);
        skuInfo.setSkuImageList(skuImageList);
    }
    return skuInfo;
}

3.2 使用Redisson实现分布式锁

在实现类添加

/**
 * 查询商品信息优化 采用Redisson实现分布式锁
 * 1.优先从缓存中获取数据
 * 2.获取分布式锁
 * 3.执行查询数据库逻辑
 *
 * @param skuId
 * @return
 */
@Override
public SkuInfo getSkuInfoAndImages(Long skuId) {
    try {
        //1.优先从分布式缓存Redis 中获取数据
        //1.1 构建商品缓存业务Key 形式: sku:商品ID:info
        String dataKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;
        //1.2 从Redis分布式缓存中获取数据  有值:直接返回 没有:获取分布式锁后执行业务
        SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(dataKey);
        if (skuInfo == null) {
            //2.尝试获取分布锁-采用Redisson实现分布式锁
            ///2.1 构建商品锁名称 形式:sku:商品id:lock
            String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX;
            RLock lock = redissonClient.getLock(lockKey);
            try {
                //2.2 创建锁对象,尝试获取锁
                boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS);
                if (flag) {
                    //2.3 执行查询数据库业务
                    //2.3.1 查询商品信息
                    skuInfo = this.getSkuInfoAndImagesFromDB(skuId);
                    if (skuInfo == null) {
                        //将null对象暂存到redis
                        redisTemplate.opsForValue().set(dataKey, skuId, RedisConst.SKUKEY_TIMEOUT, TimeUnit.MILLISECONDS);
                        return skuInfo;
                    } else {
                        //2.3.2 将商品信息放入缓存
                        redisTemplate.opsForValue().set(dataKey, skuId, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
                        //2.3.3 返回结果
                        return skuInfo;
                    }
                } else {
                    //获取锁失败,自旋
                    return this.getSkuInfoAndImages(skuId);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //3.业务执行完毕,将锁释放掉
                lock.unlock();
            }

        } else {
            //命中缓存数据,直接返回
            return skuInfo;
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    //兜底操作:查询数据库
    return this.getSkuInfoAndImagesFromDB(skuId);
}

/**
 * 根据SkuID查询sku信息以及图片
 *
 * @param skuId
 * @return
 */
public SkuInfo getSkuInfoAndImagesFromDB(Long skuId) {
    //1.根据主键ID查询商品Sku信息
    SkuInfo skuInfo = skuInfoService.getById(skuId);
    //2.根据SKuID查询商品SKU的图片列表
    if (skuInfo != null) {
        LambdaQueryWrapper<SkuImage> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SkuImage::getSkuId, skuId);
        List<SkuImage> skuImageList = skuImageService.list(queryWrapper);
        skuInfo.setSkuImageList(skuImageList);
    }
    return skuInfo;
}

3.3 在getSkuInfo 中调用上述两个方法进行测试

/**
 * 根据SkuID查询SKU商品信息包含图片列表
 *
 * @param skuId
 * @return
 */
@Override
public SkuInfo getSkuInfo(Long skuId) {
    //改造前从数据库直接查询
    //SkuInfo skuInfo = getSkuInfoDB(skuId);
    //改造后:缓存+分布式锁(RedisTemplate方式)
    //SkuInfo skuInfo = getSkuInfoRedis(skuId);

    //改造后:缓存+分布式锁(Redisson方式)
    SkuInfo skuInfo = getSkuInfoRedisson(skuId);
    return skuInfo;
}

4、分布式锁 + AOP实现缓存

随着业务中缓存及分布式锁的加入,业务代码变的复杂起来,除了需要考虑业务逻辑本身,还要考虑缓存及分布式锁的问题,增加了程序员的工作量及开发难度。而缓存的玩法套路特别类似于事务,而声明式事务就是用了aop的思想实现的。

img

  1. @Transactional 注解为植入点的切点,这样才能知道@Transactional注解标注的方法需要被代理。

  2. @Transactional注解的切面逻辑类似于@Around

模拟事务,缓存可以这样实现:

自定义缓存注解@GmallCache(类似于事务@Transactional

  1. 编写切面类,使用环绕通知实现缓存的逻辑封装

img

4.1 定义一个注解

service-util模块中定义缓存注解

package com.atguigu.gmall.common.cache;

import java.lang.annotation.*;

/**
 * 自定义注解实现目标:只要被该注解修饰方法,具备高并发访问场景.要求优先从缓存中获取数据,分布式锁避免缓存击穿问题
 * 修饰注解的信息注解-元注解
 * @Retention: 注解作用范围 SOURCE->CLASS->RUNTIME 左边作用范围包含右边
 * @Target: 注解可以修饰地方  TYPE:类上 METHOD:方法 PARAMETER:方法参数  FIELD:成员变量
 * @Inherited: 子类是否能够集成父类注解
 * @Documented: 通过Javadoc命令生成类文档,文档中是否包含该注解信息
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Inherited
@Documented
public @interface GmallCache {


    /**
     * 设置存入redis缓存业务数据Key前缀
     * @return
     */
    String prefix() default "cache:";


    /**
     * 设置存入redis缓存业务数据Key后缀
     * @return
     */
    String suffix() default ":info";
}

4.2 定义一个切面类加上注解

Spring aop 参考文档:

https://docs.spring.io/spring-framework/docs/5.3.9-SNAPSHOT/reference/html/core.html#aop img

img

package com.atguigu.gmall.common.cache;

import com.atguigu.gmall.common.constant.RedisConst;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 切面类:对@GmallCache修饰方法进行增强
 * 通知:优先从缓存中获取数据;通过分布式锁避免缓存击穿问题
 *
 * @author: atguigu
 * @create: 2023-02-28 10:29
 */
@Slf4j
@Aspect
@Component
public class GmallCacheAspect {


    @Autowired
    private RedisTemplate redisTemplate;


    @Autowired
    private RedissonClient redissonClient;


    /**
     * 切面类:环绕通知逻辑
     *
     * @param joinPoint 切点方法连接点对象
     * @return
     */
    @SneakyThrows
    @Around("@annotation(com.atguigu.gmall.common.cache.GmallCache)")
    public Object cacheAspect(ProceedingJoinPoint joinPoint) {
        Object objectResult = null;
        try {
            //一.前置通知,目标方法执行之前
            log.info("前置通知...");
            //1.优先从缓存中获取数据 命中缓存:直接返回  未命中:获取分布式锁后执行查库业务
            //1.1 构建redis缓存中业务数据的Key 从@Gmallcache注解获取业务Key前缀跟后缀,从方法中获取方法参数作为业务标识
            //1.1.1 通过切点对象获取目标方法信息,通过目标方法获取方法上注解,获取注解中参数
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            //1.1.2 获取方法上自定义注解 从注解获取信息
            GmallCache gmallCache = signature.getMethod().getAnnotation(GmallCache.class);
            //1.1.3 处理方法参数
            List<Object> argList = Arrays.asList(joinPoint.getArgs());
            //1.1.4 将集合中参数采用指定符号拼接
            String argsStr = argList.stream().map(arg -> {
                return arg.toString();
            }).collect(Collectors.joining("|"));
            String dataKey = gmallCache.prefix() + argsStr + gmallCache.suffix();

            //1.2 尝试从缓存中获取业务数据
            objectResult = redisTemplate.opsForValue().get(dataKey);
            if (objectResult == null) {
                //1.2.1 没有命中缓存数据
                //2.尝试获取分布式锁 成功:执行查库业务代码  失败:自旋,尝试下次获取
                //2.1 构建业务数据锁的Key
                String lockKey = gmallCache.prefix() + argsStr + RedisConst.SKULOCK_SUFFIX;

                //2.2 创建锁对象
                RLock lock = redissonClient.getLock(lockKey);

                //2.3 尝试获取锁 成功:执行目标方法进行查询数据库操作  失败:自旋
                Boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS);
                try {
                    if (flag) {
                        //2.3.1 获取锁成功 查询数据库业务数据,将业务数据进行缓存
                        //二.执行目标方法-执行目标方法(原来类中方法)-都是执行查询数据库操作
                        objectResult = joinPoint.proceed(joinPoint.getArgs());
                        if (objectResult == null) {
                            //数据库中不存在该数据 短暂进行缓存null对象
                            redisTemplate.opsForValue().set(dataKey, objectResult, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
                            return objectResult;
                        } else {
                            //数据库中有数据,则进行缓存 返回业务数据
                            redisTemplate.opsForValue().set(dataKey, objectResult, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
                            return objectResult;
                        }
                    } else {
                        //2.3.2 本次获取锁失败,等待下次获取,自旋:再次调用增强方法
                        Thread.sleep(100);
                        return this.cacheAspect(joinPoint);
                    }
                } finally {
                    //3.业务执行完毕后将锁释放
                    lock.unlock();
                }
            } else {
                //1.2.2 命中缓存 直接返回即可
                return objectResult;
            }
            //三.后置后置,目标发方法执行后
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        //执行查询数据库-执行目标方法
        return joinPoint.proceed(joinPoint.getArgs());
    }
}

4.3 使用注解完成缓存

/**
 * 根据SkuID查询sku信息以及图片
 *
 * @param skuId
 * @return
 */
@GmallCache(prefix = RedisConst.SKUKEY_PREFIX, suffix = RedisConst.SKUKEY_SUFFIX)
public SkuInfo getSkuInfoAndImages(Long skuId) ...
    
    
/**
 * 根据商品SKU三级分类ID查询分类信息
 *
 * @param category3Id
 * @return
 */
@GetMapping("/inner/getCategoryView/{category3Id}")
@GmallCache(prefix = "categoryView")
public BaseCategoryView getCategoryView(@PathVariable("category3Id") Long category3Id) ..
    
    
/**
 * 根据SPUID查询详情页海报图片列表
 *
 * @param spuId
 * @return
 */
@Override
@GmallCache(prefix = "spuPosterBySpuId:")...
public List<SpuPoster> getSpuPosterBySpuId(Long spuId)

/**
 * 查询当前商品所有的销售属性,判断为当前SKU拥有销售属性增加选中效果
 *
 * @param skuId
 * @param spuId
 * @return
 */
@Override
@GmallCache(prefix = "spuSaleAttrListCheckBySku:", suffix = ":info")
public List<SpuSaleAttr> getSpuSaleAttrListCheckBySku(Long skuId, Long spuId) ..
    
    
com.atguigu.gmall.product.service.impl.SkuManageServiceImpl#getAttrList
    
/**
 * 根据SkuID查询当前商品包含平台属性以及属性值
 *
 * @param skuId
 * @return
 */
@Override
@GmallCache(prefix = "attrList:")
public List<BaseAttrInfo> getAttrList(Long skuId) ..
    
/**
 * 获取每一组销售属性对应SkuID组合 {"3736|3738":"24","3736|3739":"25",}
 *
 * @param spuId
 * @return
 */
@Override
@GmallCache(prefix = "skuValueIdsMap:")
public String getSkuValueIdsMap(Long spuId)

5、布隆过滤器

5.1 布隆过滤器原理

5.1.1 什么是布隆过滤器

布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。

主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。

总结: 判断一个元素一定不存在 或者 可能存在! 存在一定的误判率{通过代码调节}

Bit 数组:

0 0 0 0 0 0 0 0 0

5.1.2 布隆过滤器使用场景

大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题

5.1.3 原理

存入过程

布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下:

通过K个哈希函数计算该数据,返回K个计算出的hash值

这些K个hash值映射到对应的K个二进制的数组下标

将K个下标对应的二进制数据改成1。

如图所示:

img

查询过程

布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下:

1、通过K个哈希函数计算该数据,对应计算出的K个hash值

2、通过hash值找到对应的二进制的数组下标

3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据可能存在集合中。

5.1.4 布隆过滤器的优缺点

优点

  1. 由于存储的是二进制数据,所以占用的空间很小

  2. 它的插入和查询速度是非常快的,时间复杂度是O(K),空间复杂度:O (M)。

    • K: 是哈希函数的个数

    • M: 是二进制位的个数

  3. 保密性很好,因为本身不存储任何原始数据,只有二进制数据

缺点

  1. 添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。

img

例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。

这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。

  1. 无法删除元素

由此得出如下缺点:

一、存在误判

假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。

因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。

误判率:

​ 受三个因素影响: 二进制位的个数m, 哈希函数的个数k, 数据规模n (添加到布隆过滤器中的函数)

img

已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了}

img

ln: 自然对数是以常数e为底数对数,记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。

二、删除困难

还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。

如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。

那么你是不是连“张三丰”都一起删了呀。

5.2 实现方式

5.2.1 初始化skuId的布隆过滤器

RedisConst 常量类中添加,使用Redis中数据结构:bitmap

//  布隆过滤器使用!
public static final String SKU_BLOOM_FILTER="sku:bloom:filter";

操作模块:service-product

修改启动类

package com.atguigu.gmall;

import com.atguigu.gmall.common.constant.RedisConst;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;

/**
 * @author: atguigu
 * @create: 2022-12-21 14:39
 */
@SpringBootApplication
@EnableDiscoveryClient
//@ComponentScan(basePackages = {"com.xxx"})
public class ProductApp implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(ProductApp.class, args);
    }


    @Autowired
    private RedissonClient redissonClient;

    /**
     * Springboot应用初始化后会执行一次该方法
     * @param args
     * @throws Exception
     */
    @Override
    public void run(String... args) throws Exception {
        //初始化布隆过滤器
        RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
        //设置数据规模 误判率 预计统计元素数量为100000,期望误差率为0.01
        bloomFilter.tryInit(100000, 0.01);
    }
}

SpringBoot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次。

5.2.2 给商品详情页添加布隆过滤器

1、查看商品详情页添加布隆过滤器

操作模块:service-item

更改ItemServiceImpl.getBySkuId方法

@Autowired
private RedissonClient redissonClient;

/**
 * 远程调用商品微服务:根据skuID汇总sku商品详情页所有数据
 *
 * @param skuId
 * @return
 */
@Override
public Map<String, Object> getBySkuId(Long skuId) {
    //0.创建响应结果Map
    Map<String, Object> result = new HashMap<>();

    //远程调用商品微服务7个接口之前 提前知道用户访问商品SKUID是否存在与布隆过滤器
    RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
    if (!bloomFilter.contains(skuId)) {
        log.error("用户查询商品sku不存在:{}", skuId);
        //查询数据不存在直接返回空对象
        return result;
    }
    ...
}

2、添加商品sku加入布隆过滤器数据

操作模块:service-product

更改SkuManageServiceImpl.saveSkuInfo方法

/**
 * 保存SKU信息
 * 1.将SKU基本信息存入sku_info表中
 * 2.将提交SKU图片存入sku_image表 关联SKU  设置sku_id逻辑外键
 * 3.将提交的平台属性列表 批量保存 sku_attr_value  关联SKU  设置sku_id逻辑外键
 * 4.将提交的销售属性列表 批量保存 sku_sale_attr_value  关联SKU  设置sku_id逻辑外键
 *
 * @param skuInfo SKU相关信息
 */
public void saveSkuInfo(SkuInfo skuInfo) {
    /*
        将数据插入数据库表!
        skuInfo:   库存单元表! 记录是哪个spu下的sku!
        skuImage:  库存单元图片表!
        skuSaleAttrValue;  sku 与 销售属性值中间表!
        skuAttrValue:  sku 与平台属性值中间表!
     */
    skuInfoMapper.insert(skuInfo);

    ...

	//将新增的商品SKUID存入布隆过滤器
    //5. 获取布隆过滤器,将新增skuID存入布隆过滤器
    RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
    bloomFilter.add(skuInfo.getId());
}