学习目标:
虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。
一般分为两个层面:
重点要讲的是另外一个层面:尽量避免直接查询数据库。
解决办法就是:缓存
由于Redis作为缓存数据库,要被多个项目使用,所以要制作一个通用的工具类,方便工程中的各个模块使用。
而主要使用Redis的模块,都是后台服务的模块,service工程。所以咱们把Redis的工具类放到service-util模块中,这样所有的后台服务模块都可以使用Redis。
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成Redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
在service-util
配置自定义RedisTemplate对象-设置Key,Val的序列化方式
package com.atguigu.gmall.common.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfig {
@Primary
@Bean
public RedisTemplate<Object, Object> RedisTemplate(RedisConnectionFactory RedisConnectionFactory) {
RedisTemplate<Object, Object> RedisTemplate = new RedisTemplate<>();
RedisTemplate.setConnectionFactory(RedisConnectionFactory);
//使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化) 对存储的对象进行JSON序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 序列化key value
RedisTemplate.setKeySerializer(new StringRedisSerializer());
RedisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
RedisTemplate.setHashKeySerializer(new StringRedisSerializer());
RedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
RedisTemplate.afterPropertiesSet();
return RedisTemplate;
}
}
说明:由于service-util
属于公共模块,所以我们把它引入到service父模块,其他service子模块都自动引入了
开始开发先说明Redis key的命名规范,由于Redis不像数据库表那样有结构,其所有的数据全靠key进行索引,所以Redis数据的可读性,全依靠key。
企业中最常用的方式就是:
object:id:field
比如:sku:1314:info
user:1092:info
: 表示根据windows的 / 一个意思
在RedisConst中定义Redis的常量,RedisConst类在service-util模块中,所有的Redis常量我们都配置在这里
package com.atguigu.gmall.common.constant;
/**
* Redis常量配置类
*
*/
public class RedisConst {
public static final String SKUKEY_PREFIX = "sku:";
public static final String SKUKEY_SUFFIX = ":info";
//单位:秒
public static final long SKUKEY_TIMEOUT = 24 * 60 * 60;
}
缓存最常见的3个问题: 面试
缓存穿透
缓存雪崩
缓存击穿
缓存穿透: 是指查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
解决1 :空结果也进行缓存,但它的过期时间会很短,最长不超过五分钟,但是不能防止随机穿透。
解决2 :使用布隆过滤器来解决随机穿透问题。
缓存雪崩:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决1:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
解决2:如果单节点宕机,可以采用集群部署方式防止雪崩
缓存击穿: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
与缓存雪崩的区别:
击穿是一个热点key失效
雪崩是很多key集体失效
之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题
在service-product
中新建TestController中添加测试方法
package com.atguigu.gmall.product.controller;
@Api(tags = "测试接口")
@RestController
@RequestMapping("admin/product/test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("testLock")
public Result testLock() {
testService.testLock();
return Result.ok();
}
}
业务接口
package com.atguigu.gmall.product.service;
public interface TestService {
void testLock();
}
业务实现类
package com.atguigu.gmall.product.service.impl;
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void testLock() {
// 查询Redis中的num值
String value = (String)this.redisTemplate.opsForValue().get("num");
// 没有该值return
if (StringUtils.isBlank(value)){
return ;
}
// 有值就转成成int
int num = Integer.parseInt(value);
// 把Redis中的num值+1
this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
}
}
说明:通过reids客户端设置num=0
登录虚拟机192.168.200.128,命令行中使用ab测试工具:httpd-tools(yum install -y httpd-tools)已安装
命令语法:ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
测试如下:5000请求,100并发 注意:关闭本地Windows方法
ab -n 5000 -c 100 http://192.168.200.1:8206/admin/product/test/testLock
查看Redis中的值:
@Override
public synchronized void testLock() {
// 查询Redis中的num值
String value = (String)this.redisTemplate.opsForValue().get("num");
// 没有该值return
if (StringUtils.isBlank(value)){
return ;
}
// 有值就转成成int
int num = Integer.parseInt(value);
// 把Redis中的num值+1
this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
}
使用ab工具压力测试:5000次请求,并发100
查看Redis中的结果:
完美!与预期一致,是否真的完美?
接下来再看集群情况下,会怎样?
接下来启动8206 8216 8226 三个运行实例,运行多个service-product
实例:
server.port=8216
server.port=8226
注意:bootstrap.properties 添加一个server.port = 8206; 将nacos的配置注释掉!
通过网关压力测试:
启动网关:
ab -n 5000 -c 100 http://192.168.200.1/admin/product/test/testLock
查看Redis中的值:
集群情况下又出问题了!!!
以上测试,可以发现:
本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性。
此时需要分布式锁。。
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
基于数据库实现分布式锁
基于缓存( Redis等)
基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
性能:Redis最高
可靠性:zookeeper最高
因为Redis具备高性能、高可用、高并发的特性,这里,我们就基于Redis实现分布式锁。
分布式锁的关键是多进程共享的内存标记(锁),因此只要我们在Redis中放置一个这样的标记(数据)就可以了。不过在实现过程中,不要忘了我们需要实现下列目标:
多进程可见:多进程可见,否则就无法实现分布式效果
避免死锁:死锁的情况有很多,我们要思考各种异常导致死锁的情况,保证锁可以被释放
尝试获取锁
成功:执行业务代码 执行业务 try(){业务代码-宕机} catch() finally{ 释放锁}
失败:等待;失效;下次
排它:同一时刻,只能有一个进程获得锁
高可用:避免锁服务宕机或处理好宕机的补救措施(redis集群架构:1.主从复制 2.哨兵 3.cluster集群)
多个客户端同时获取锁(setnx)
获取成功,执行业务逻辑{从db获取数据,放入缓存,执行完成释放锁(del)
其他客户端等待重试
/**
* 采用SpringDataRedis实现分布式锁
* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
*/
@Override
public void testLock() {
//0.先尝试获取锁 setnx key val
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
stringRedisTemplate.delete("lock");
}else{
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
重启,服务集群,通过网关压力测试:
查看Redis中num的值:
基本实现。
问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放
解决:设置过期时间,自动释放锁。
设置过期时间有两种方式:
首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
在set时指定过期时间(推荐)
设置过期时间:
压力测试肯定也没有问题。自行测试
问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s。执行流程如下
index1业务逻辑没执行完,3秒后锁被自动释放。
index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
index3获取到锁,执行业务逻辑
index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放。
最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
问题:删除操作缺乏原子性。
场景:
在Redis中没有了锁。
index2获取了lock,index2线程获取到了cpu的资源,开始执行方法
index1执行删除,此时会把index2的lock删除
index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行
删除的index2的锁!
/**
* 采用SpringDataRedis实现分布式锁
* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
*/
@Override
public void testLock() {
//0.先尝试获取锁 setnx key val
//问题:锁可能存在线程间相互释放
//Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);
//解决:锁值设置为uuid
String uuid = UUID.randomUUID().toString();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放 判断uuid
//问题:删除操作缺乏原子性。
//if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值
// //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除
// stringRedisTemplate.delete("lock");
//}
//解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行
//执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i]) 参数三:lua脚本中需要参数值 ARGV[i]
//4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
//4.2设置脚本文本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
redisScript.setScriptText(script);
//4.3 设置响应类型
redisScript.setResultType(Long.class);
stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
}else{
try {
//睡眠
Thread.sleep(100);
//自旋重试
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
问题:
Redis集群状态下的问题:
客户端A从master获取到锁
在master将锁同步到slave之前,master宕掉了。
slave节点被晋级为master节点
客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
安全失效!
解决方案:了解即可!
1、加锁
// 1. 从Redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
2、使用lua释放锁
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
3、重试
Thread.sleep(500);
testLock();
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/Redisson/Redisson/wiki
Github 地址:https://github.com/Redisson/Redisson
service-util
模块导入依赖
<!-- Redisson -->
<dependency>
<groupId>org.Redisson</groupId>
<artifactId>Redisson</artifactId>
<version>3.15.3</version>
</dependency>
配置Redisson客户端对象
package com.atguigu.gmall.common.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
@Configuration
public class RedissonConfig {
private static String ADDRESS_PREFIX = "redis://%s:%d";
private int timeout = 3000;
@Bean
public RedissonClient redissonClient(RedisProperties prop) {
if (StringUtils.isEmpty(prop.getHost())) {
throw new RuntimeException("host is empty");
}
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer()
.setAddress(String.format(ADDRESS_PREFIX, prop.getHost(), prop.getPort()))
.setTimeout(timeout);
if (!StringUtils.isEmpty(prop.getPassword())) {
singleServerConfig.setPassword(prop.getPassword());
}
return Redisson.create(config);
}
}
注意:这里读取了一个名为RedisProperties的属性,因为我们引入了SpringDataRedis,Spring已经自动加载了RedisProperties,并且读取了配置文件中的Redis信息。
修改实现类
@Autowired
private RedissonClient redissonClient;
/**
* 使用Redison实现分布式锁
* 开发步骤:
* 1.使用RedissonClient客户端对象 创建锁对象
* 2.调用获取锁方法
* 3.执行业务逻辑
* 4.将锁释放
*
*/
public void testLock() {
//0.创建锁对象
RLock lock = redissonClient.getLock("lock1");
//0.1 尝试加锁
//0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30s
lock.lock();
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
lock.unlock();
}
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
快速入门使用的就是可重入锁。也是最常使用的锁。
最常见的使用:
@Autowired
private RedissonClient redissonClient;
/**
* 使用Redison实现分布式锁
* 开发步骤:
* 1.使用RedissonClient客户端对象 创建锁对象
* 2.调用获取锁方法
* 3.执行业务逻辑
* 4.将锁释放
*/
public void testLock() {
//0.创建锁对象
RLock lock = redissonClient.getLock("lock1");
try {
//0.1 尝试加锁
//0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30s
//lock.lock();
//0.1.2 lock(数字,时间单位) 指定获取锁成功有效期,直到获取锁成功。到期自动释放
//lock.lock(10, TimeUnit.SECONDS);
//0.1.3 tryLock(等待获取锁时间,锁的有效期,时间单位) 指定时间内如果获取锁成功,返回true 执行后续业务,如果超过等待时间,返回false
boolean flag = lock.tryLock(2, 10, TimeUnit.SECONDS);
if (flag) {
System.out.println(Thread.currentThread().getName() + "线程获取锁成功");
//获取成功 执行业务
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//测试可重入锁
this.check();
//4.将锁释放
lock.unlock();
} else {
//本地获取锁失败
Thread.sleep(100);
this.testLock();
}
} catch (InterruptedException e) {
e.printStackTrace();
//如果执行异常,释放锁
lock.unlock();
}
}
/**
* 测试可重入
*/
private void check() {
//尝试获取锁
RLock lock = redissonClient.getLock("lock1");
lock.lock();
//执行业务
System.out.println(Thread.currentThread().getName() + "可重入获取成功");
//释放锁
lock.unlock();
}
看门狗原理:
1、如果我们指定了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期; 2、如果我们未指定锁的超时时间,就使用
lockWatchdogTimeout = 30 * 1000
【看门狗默认时间】
改造程序:
重启后在浏览器测试:
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = Redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
代码实现
TestController
/**
* 读数据接口
* @return
*/
@GetMapping("/read")
public Result read(){
String msg = testService.read();
return Result.ok(msg);
}
/**
* 写数据接口
* @return
*/
@GetMapping("/write")
public Result write(){
testService.write();
return Result.ok("写入数据成功");
}
TestService接口
/**
* 读数据接口
* @return
*/
String read();
/**
* 写数据接口
* @return
*/
void write();
TestServiceImpl实现类
/**
* 从Redis中读数据
*
* @return
*/
@Override
public String read() {
//1.创建读写锁对象
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
//2.获取读锁
RLock lock = readWriteLock.readLock();
//加锁 给锁的有效期设置为10s
lock.lock(10, TimeUnit.SECONDS);
//3.执行从redis获取数据业务
String msg = stringRedisTemplate.opsForValue().get("msg");
//4.释放读锁
//lock.unlock();
return msg;
}
/**
* 将数据写入redis
*
* @return
*/
@Override
public void write() {
//1.创建读写锁对象
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
//2.获取写锁
RLock lock = readWriteLock.writeLock();
//加锁 给锁的有效期设置为10s
lock.lock(10, TimeUnit.SECONDS);
//3.业务将数据写入redis
stringRedisTemplate.opsForValue().set("msg", "msgData");
//4.释放写锁
//lock.unlock();
}
打开两个浏览器窗口测试:
http://localhost:8206/admin/product/test/read
http://localhost:8206/admin/product/test/write
同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
同时访问读:不用等待
先写后读:读要等待(约10s)写完成
先读后写:写要等待(约10s)读完成
RedisConst 类中追加一个变量
// 商品如果在数据库中不存在那么会缓存一个空对象进去,但是这个对象是没有用的,所以这个对象的过期时间应该不能太长,
// 如果太长会占用内存。
// 定义变量,记录空对象的缓存过期时间
public static final long SKUKEY_TEMPORARY_TIMEOUT = 10 * 60;
在实现类中引入
@Autowired
private RedisTemplate redisTemplate;
/**
* 根据SkuID查询SKU商品信息包含图片列表
*
* @param skuId
* @return
*/
@Override
public SkuInfo getSkuInfo(Long skuId) {
//改造前从数据库直接查询
//SkuInfo skuInfo = getSkuInfoDB(skuId);
//改造1:缓存+分布式锁(RedisTemplate)
SkuInfo skuInfo = getSkuInfoRedis(skuId);
return skuInfo;
}
/**
* 采用SpringDataRedis提供实现分布锁
* 优先从缓存中获取商品信息,缓存未命中,避免出现缓存击穿,分布式锁
*
* @param skuId
* @return
*/
private SkuInfo getSkuInfoRedis(Long skuId) {
try {
//1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿
//1.1 构建商品详情key-缓存商品信息Key 形式: sku:29:info
String skuKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;
//1.2 根据key查询缓存中商品数据
SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(skuKey);
if (skuInfo == null) {
//2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存)
//2.1 为每个查询商品构建商品SKU锁Key值 形式:sku:29:lock sku:30:lock
String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX;
//2.2 构建锁的值UUID
String uuid = UUID.randomUUID().toString().replace("-", "");
//2.3 调用redis执行set key val ex 10 nx 获取锁
Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, RedisConst.SKULOCK_EXPIRE_PX1, TimeUnit.SECONDS);
//2.3.1 获取锁成功,执行业务(查询数据库,放入缓存)
if (flag) {
//2.4 执行业务:查询数据库获取商品信息
skuInfo = this.getSkuInfoDB(skuId);
//2.4.1 数据库中本身不存在 短时间缓存空对象,返回空对象
if (skuInfo == null) {
redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
return skuInfo;
}
//2.4.2 数据库中有数据,将数据加入缓存
redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
//2.5 将锁释放掉 保证删除原子性采用lua脚本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList(lockKey), uuid);
return skuInfo;
} else {
//2.3.2 获取锁失败,自旋等待下次获取
Thread.sleep(100);
return this.getSkuInfoRedis(skuId);
}
} else {
//命中缓存直接返回即可
return skuInfo;
}
} catch (Exception e) {
e.printStackTrace();
}
//兜底方案:查询数据库
return getSkuInfoDB(skuId);
}
在实现类添加
/**
* 根据SkuID查询SKU商品信息包含图片列表
*
* @param skuId
* @return
*/
@Override
public SkuInfo getSkuInfo(Long skuId) {
//改造前从数据库直接查询
//SkuInfo skuInfo = getSkuInfoDB(skuId);
//改造后:缓存+分布式锁(RedisTemplate方式)
//SkuInfo skuInfo = getSkuInfoRedis(skuId);
//改造后:缓存+分布式锁(Redisson方式)
SkuInfo skuInfo = getSkuInfoRedisson(skuId);
return skuInfo;
}
/**
* 采用Redisson提供实现分布锁
* 优先从缓存中获取商品信息,缓存未命中,避免出现缓存击穿,分布式锁
*
* @param skuId
* @return
*/
private SkuInfo getSkuInfoRedisson(Long skuId) {
try {
//1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿
//1.1 构建商品商品信息Key 形式:sku:29:info
String skuKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;
//1.2 从缓存中获取商品信息 结果 命中缓存:直接返回 未命中 执行第二步
SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(skuKey);
if (skuInfo == null) {
//2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存)
//2.1 为每个商品声明锁的Key 形式:sku:29:lock
String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX;
//2.2创建可重入锁对象
RLock lock = redissonClient.getLock(lockKey);
//2.3尝试获取锁 结果 获取锁成功:执行业务(查询数据库 有值:放入缓存 没值:缓存空对象) 释放锁
boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS);
//2.3.1 获取锁成功 查库缓存 释放锁
if (flag) {
try {
//2.3.2 查询数据库
skuInfo = this.getSkuInfoDB(skuId);
//2.3.2.1 没值 缓存空对象
if (skuInfo == null) {
redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
return skuInfo;
}
//2.3.2.2 有值:将数据放入缓存
redisTemplate.opsForValue().set(skuKey, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
//2.3.3 释放锁
lock.unlock();
}
return skuInfo;
} else {
//2.2.2 获取锁失败 自旋下次重试
Thread.sleep(200);
return this.getSkuInfoRedisson(skuId);
}
} else {
//命中缓存则直接返回
return skuInfo;
}
} catch (Exception e) {
e.printStackTrace();
}
//兜底方案:查询数据库
return this.getSkuInfoDB(skuId);
}
/**
* 根据SkuID查询SKU商品信息包含图片列表
*
* @param skuId
* @return
*/
@Override
public SkuInfo getSkuInfo(Long skuId) {
//改造前从数据库直接查询
//SkuInfo skuInfo = getSkuInfoDB(skuId);
//改造后:缓存+分布式锁(RedisTemplate方式)
//SkuInfo skuInfo = getSkuInfoRedis(skuId);
//改造后:缓存+分布式锁(Redisson方式)
SkuInfo skuInfo = getSkuInfoRedisson(skuId);
return skuInfo;
}
随着业务中缓存及分布式锁的加入,业务代码变的复杂起来,除了需要考虑业务逻辑本身,还要考虑缓存及分布式锁的问题,增加了程序员的工作量及开发难度。而缓存的玩法套路特别类似于事务,而声明式事务就是用了aop的思想实现的。
以 @Transactional 注解为植入点的切点,这样才能知道@Transactional注解标注的方法需要被代理。
@Transactional注解的切面逻辑类似于@Around
模拟事务,缓存可以这样实现:
自定义缓存注解@GmallCache(类似于事务@Transactional)
在service-util
模块中定义缓存注解
package com.atguigu.gmall.common.cache;
import java.lang.annotation.*;
/**
* 自定义注解
* 作用:优先从缓存中获取数据+避免缓存击穿采用分布式锁
* 用来修饰注解的注解称为:元注解
* 1.Target 注解可以被修饰位置 TYPE:类 METHOD:方法 FIELD:属性
* 2.Retention 注解生命周期
* 3.Inherited 注解是否可以被继承
* 4.Documented 产生文档
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface GmallCache {
/**
* 缓存数据前缀
*
* @return
*/
String prefix() default "cache:";
/**
* 缓存数据后缀
*
* @return
*/
String suffix() default ":info";
}
Spring aop 参考文档:
https://docs.spring.io/spring-framework/docs/5.3.9-SNAPSHOT/reference/html/core.html#aop
package com.atguigu.gmall.common.cache;
import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.common.constant.RedisConst;
import com.sun.org.apache.regexp.internal.RE;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 缓存+分布式锁切面类
*
* @author: atguigu
* @create: 2023-01-03 09:33
*/
@Component
@Aspect
@Slf4j
public class GmallCacheAspect {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* 缓存+分布式锁切面逻辑
*
* @param joinPoint 切入点(真正被增强位置)包含目标方法各种信息
* @return
*/
@SneakyThrows //lombok提供处理异常try catch
@Around("@annotation(com.atguigu.gmall.common.cache.GmallCache)")
public Object cacheAroundAdvice(ProceedingJoinPoint joinPoint) {
Object resultObject = null;
//一. 前置通知
try {
log.info("前置通知");
//1.优先从缓存中获取数据,如果命中缓存则直接返回,未命中-采用分布式锁避免缓存击穿
//1.1 声明缓存数据key 形式:前缀+数据ID+后缀
//1.1.1 通过切入点对象joinPoint获取相关注解
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
//1.1.2 获取方法对象,从方法上获取注解 从注解中获取属性值
GmallCache gmallCache = methodSignature.getMethod().getAnnotation(GmallCache.class);
//1.1.2 从切入点对象中获取目标方法参数
Object[] args = joinPoint.getArgs();
String argsStr = Arrays.asList(args).stream().map(arg -> {
return arg.toString();
//将切入点方法如果存在多参数用|拼接
}).collect(Collectors.joining("|"));
String dataKey = gmallCache.prefix() + argsStr + gmallCache.suffix();
//1.2 从缓存中获取数据 结果 命中:直接返回 未命中:执行2.获取锁方法
resultObject = this.cacheHit(dataKey, methodSignature);
if (resultObject == null) {
//2.尝试获取锁 获取锁成功-执行数据库查询,有值:将查询结果放入缓存 没值:缓存空对象(暂存)
//2.1 为每个业务数据声明锁的Key 形式:前缀+数据ID+后缀
String lockKey = gmallCache.prefix() + argsStr + RedisConst.SKULOCK_SUFFIX;
//2.2 创建可重入锁对象
RLock lock = redissonClient.getLock(lockKey);
//2.3 尝试获取锁 结果 获取到锁:执行目标方法(查询数据)将查询结果加入缓存 没有获取到锁:自旋
boolean flag = lock.tryLock(RedisConst.SKULOCK_EXPIRE_PX1, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS);
if (flag) {
try {
//二. 执行目标方法
//2.3.1 获取锁成功 执行查询数据 将数据放入缓存 释放锁
resultObject = joinPoint.proceed(joinPoint.getArgs());
if (resultObject == null) {
//2.3.1.1 库中不存在数据,缓存空对象,返回空对象
redisTemplate.opsForValue().set(dataKey, JSON.toJSONString(resultObject), RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
return resultObject;
} else {
//2.3.1.2 将数据放入缓存 释放锁 返回数据
redisTemplate.opsForValue().set(dataKey, JSON.toJSONString(resultObject), RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
return resultObject;
}
} finally {
//2.3.2 释放锁
lock.unlock();
}
} else {
//2.3.2 没有获取到锁:自旋
Thread.sleep(100);
return this.cacheAroundAdvice(joinPoint);
}
//2.4 将锁释放
} else {
// 命中缓存直接返回即可
return resultObject;
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}
//三.后置通知
log.info("后置通知");
return joinPoint.proceed(joinPoint.getArgs());
}
/**
* 从Redis缓存中获取数据
*
* @param dataKey 缓存业务数据key
* @param methodSignature 切入点方法
*/
private Object cacheHit(String dataKey, MethodSignature methodSignature) {
//1.直接从redis中获取业务数据 缓存中无论是List 对象 都存JSON字符串 可能:SkuInfo List<SpuPoster>
String dataStr = (String) redisTemplate.opsForValue().get(dataKey);
if (StringUtils.isNotBlank(dataStr)) {
//2.对缓存结果进行判断
//2.1命中缓存 将JSON字符串 反序列化为对应类型
Class returnType = methodSignature.getReturnType();
return JSON.parseObject(dataStr, returnType);
}
//2.2没有命中
return null;
}
}
/**
* 根据SkuID查询SKU商品信息包含图片列表
*
* @param skuId
* @return
*/
@Override
@GmallCache(prefix = "sku:", suffix = ":info")
public SkuInfo getSkuInfo(Long skuId) {
return getSkuInfoDB(skuId);
}
/**
* 通过三级分类id查询分类信息
* @param category3Id
* @return
*/
@Override
@GmallCache(prefix = "categoryView:")
public BaseCategoryView getCategoryViewByCategory3Id(Long category3Id) {
....
}
/**
* 根据商品SPUID查询商品海报列表
*
* @param spuId 商品SPUID
* @return
*/
@Override
@GmallCache(prefix = "spuPosterBySpuId:")
public List<SpuPoster> findSpuPosterBySpuId(Long spuId) {
// select * from spu_poster where spu_id = spuId;
return spuPosterMapper.selectList(new QueryWrapper<SpuPoster>().eq("spu_id",spuId));
}
/**
* 根据SKUID查询当前SKU商品中平台属性以及属性值
*
* @param skuId
* @return
*/
@Override
@GmallCache(prefix = "attrList:")
public List<BaseAttrInfo> getAttrList(Long skuId) {
/**
* 根据SPU_ID,SKU_ID查询所有销售属性,标识SKU选中销售属性
*
* @param skuId
* @param spuId
* @return
*/
@Override
@GmallCache(prefix = "spuSaleAttrListCheckBySku:")
public List<SpuSaleAttr> getSpuSaleAttrListCheckBySku(Long skuId, Long spuId) {
....
}
/**
* 返回所有销售属性对应skuID map集合
*
* @param spuId
* @return
*/
@Override
@GmallCache(prefix = "skuValueIdsMap:")
public Map getSkuValueIdsMap(Long spuId) {
....
}
/**
* 根据商品实时最新价格
*
* @param skuId 商品ID
* @return
*/
@Override
public BigDecimal getSkuPrice(Long skuId) {
//1.避免出现缓存击穿
//1.1 构建锁的key
String lockKey = "sku:price:" + skuId + ":lock";
//1.2 创建锁对象
RLock lock = redissonClient.getLock(lockKey);
try {
//1.3 获取锁
lock.lock();
//select price from sku_info where id = 29;
LambdaQueryWrapper<SkuInfo> queryWrapper = new LambdaQueryWrapper<>();
//设置查询条件
queryWrapper.eq(SkuInfo::getId, skuId);
//设置查询字段
queryWrapper.select(SkuInfo::getPrice);
SkuInfo skuInfo = skuInfoMapper.selectOne(queryWrapper);
if (skuInfo != null) {
return skuInfo.getPrice();
}
return new BigDecimal("0");
} finally {
//1.4 释放锁
lock.unlock();
}
}
布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。
主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。
总结: 判断一个元素一定不存在 或者 可能存在! 存在一定的误判率{通过代码调节}
Bit 数组:
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
---|---|---|---|---|---|---|---|---|
大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题
存入过程
布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下:
通过K个哈希函数计算该数据,返回K个计算出的hash值
这些K个hash值映射到对应的K个二进制的数组下标
将K个下标对应的二进制数据改成1。
如图所示:
查询过程
布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下:
1、通过K个哈希函数计算该数据,对应计算出的K个hash值
2、通过hash值找到对应的二进制的数组下标
3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据可能存在集合中。
优点
由于存储的是二进制数据,所以占用的空间很小
它的插入和查询速度是非常快的,时间复杂度是O(K),空间复杂度:O (M)。
K: 是哈希函数的个数
M: 是二进制位的个数
保密性很好,因为本身不存储任何原始数据,只有二进制数据
缺点
例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。
这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。
由此得出如下缺点:
一、存在误判
假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。
因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。
误判率:
受三个因素影响: 二进制位的个数m, 哈希函数的个数k, 数据规模n (添加到布隆过滤器中的函数)
已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了}
ln: 自然对数是以常数e为底数的对数,记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。
二、删除困难
还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。
如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。
那么你是不是连“张三丰”都一起删了呀。
RedisConst 常量类中添加,使用Redis中数据结构:bitmap
// 布隆过滤器使用!
public static final String SKU_BLOOM_FILTER="sku:bloom:filter";
操作模块:service-product
修改启动类
package com.atguigu.gmall;
import com.atguigu.gmall.common.constant.RedisConst;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
/**
* @author: atguigu
* @create: 2022-12-21 14:39
*/
@SpringBootApplication
@EnableDiscoveryClient
//@ComponentScan(basePackages = {"com.xxx"})
public class ProductApp implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ProductApp.class, args);
}
@Autowired
private RedissonClient redissonClient;
/**
* Springboot应用初始化后会执行一次该方法
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
//初始化布隆过滤器
RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
//设置数据规模 误判率 预计统计元素数量为100000,期望误差率为0.01
bloomFilter.tryInit(100000, 0.01);
}
}
SpringBoot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次。
1、查看商品详情页添加布隆过滤器
操作模块:service-item
更改ItemServiceImpl.getBySkuId方法
@Autowired
private RedissonClient redissonClient;
/**
* 远程调用商品微服务:根据skuID汇总sku商品详情页所有数据
*
* @param skuId
* @return
*/
@Override
public Map<String, Object> getBySkuId(Long skuId) {
//0.创建响应结果Map
Map<String, Object> result = new HashMap<>();
//远程调用商品微服务7个接口之前 提前知道用户访问商品SKUID是否存在与布隆过滤器
RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
if (!bloomFilter.contains(skuId)) {
log.error("用户查询商品sku不存在:{}", skuId);
//查询数据不存在直接返回空对象
return result;
}
...
}
2、添加商品sku加入布隆过滤器数据
操作模块:service-product
更改SkuManageServiceImpl.saveSkuInfo方法
/**
* 保存SKU信息
* 1.将SKU基本信息存入sku_info表中
* 2.将提交SKU图片存入sku_image表 关联SKU 设置sku_id逻辑外键
* 3.将提交的平台属性列表 批量保存 sku_attr_value 关联SKU 设置sku_id逻辑外键
* 4.将提交的销售属性列表 批量保存 sku_sale_attr_value 关联SKU 设置sku_id逻辑外键
*
* @param skuInfo SKU相关信息
*/
public void saveSkuInfo(SkuInfo skuInfo) {
/*
将数据插入数据库表!
skuInfo: 库存单元表! 记录是哪个spu下的sku!
skuImage: 库存单元图片表!
skuSaleAttrValue; sku 与 销售属性值中间表!
skuAttrValue: sku 与平台属性值中间表!
*/
skuInfoMapper.insert(skuInfo);
...
//将新增的商品SKUID存入布隆过滤器
//5. 获取布隆过滤器,将新增skuID存入布隆过滤器
RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(RedisConst.SKU_BLOOM_FILTER);
bloomFilter.add(skuInfo.getId());
}