谷粒随享
学习目标:
一个看起来很简单的应用,背后可能需要数十或数百个服务来支撑,一个请求就要多次服务调用。当请求变慢、或者不能使用时,我们是不知道是哪个后台服务引起的。这时,我们使用 Zipkin 就能解决这个问题。由于业务访问量的增大,业务复杂度增加,以及微服务架构和容器技术的兴起,要对系统进行各种拆分。微服务系统拆分后,我们可以使用 Zipkin 链路,来快速定位追踪有故障的服务点。
Zipkin 是一款开源的分布式实时数据追踪系统(Distributed Tracking System),能够收集服务间调用的时序数据,提供调用链路的追踪。
Zipkin 其主要功能是聚集来自各个异构系统的实时监控数据,在微服务架构下,十分方便地用于服务响应延迟等问题的定位。
Zipkin 每一个调用链路通过一个 trace id 来串联起来,只要你有一个 trace id ,就能够直接定位到这次调用链路,并且可以根据服务名、标签、响应时间等进行查询,过滤那些耗时比较长的链路节点。
Zipkin 分布式跟踪系统就能非常好地解决该问题,主要解决以下3点问题:
在优化前通过Zipkin链路追踪进行查看接口耗时
在service
微服务父工程pom.xml中新增Zipkin相关依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
在Nacos配置common.yaml
中新增配置(已完成)
spring:
zipkin:
base-url: http://192.168.200.6:9411
discovery-client-enabled: false
sender:
type: web
management:
zipkin:
tracing:
endpoint: http://192.168.200.6:9411/api/v2/spans
tracing:
sampling:
probability: 1.0 # 记录速率100%
访问专辑详情接口、或查看专辑页面进行测试
虽然咱们实现了页面需要的功能,但是考虑到该页面是被用户高频访问的,所以性能需要优化。一般一个系统最大的性能瓶颈,就是数据库的io操作。从数据库入手也是调优性价比最高的切入点。
一般分为两个层面:
重点要讲的是第二个层面:尽量避免直接查询数据库。
解决办法就是:缓存
缓存最常见的3个问题:
缓存穿透: 是指查询一个不存在的数据,由于缓存无法命中,将去查询数据库,但是数据库也无此记录,并且出于容错考虑,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
缓存雪崩:是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
缓存击穿: 是指对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个key在大量请求同时进来之前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
与缓存雪崩的区别:
之前,我们学习过synchronized 及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题
在service-album
微服务中添加 TestController 控制器:
package com.atguigu.tingshu.album.api;
import com.atguigu.tingshu.album.service.TestService;
import com.atguigu.tingshu.common.result.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author atguigu
* @ClassName TestController
* @description: TODO
* @date 2023年08月22日
* @version: 1.0
*/
@RestController
@RequestMapping("api/album/test")
public class TestController {
// 注入服务层方法
@Autowired
private TestService testService;
/**
* 测试分布式锁
* @return
*/
@GetMapping("testLock")
public Result testLock(){
testService.testLock();
return Result.ok();
}
}
TestService接口:
public interface TestService {
/**
* 测试分布式锁
*/
void testLock();
}
TestServiceImpl实现:
package com.atguigu.tingshu.album.service.impl;
import com.atguigu.tingshu.album.service.TestService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* @author: atguigu
* @create: 2023-10-07 08:54
*/
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void testLock() {
try {
//1.从Redis缓存中获取key="num"的值 保证redis中存在"num"(手动提前在redis中创建key)
String value = stringRedisTemplate.opsForValue().get("num");
if (StringUtils.isBlank(value)) {
return;
}
//2.对获取到值进行+1操作
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
}
启动三个微服务端口号分别为 8501、8601、8701 然后通过网关复杂均衡访问控制器
注释掉nacos配置中端口,在本地bootstrap.properties中配置默认端口,避免端口被覆盖。导致端口冲突
测试本地锁:
在linux 系统中安装 ab 压力测试工具 httpd-tools(已安装)
yum install -y httpd-tools
命令语法:ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 注意:将Windows防火墙关闭
ab -n 5000 -c 100 http://192.168.200.1:8500/api/album/test/testLock
发现结果不是5000 ,说明本地锁是锁不住资源的;因此要使用分布式锁!
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
每一种分布式锁解决方案都有各自的优缺点:
多个客户端同时获取锁(setnx)
获取成功,执行业务逻辑{从db获取数据,放入缓存,执行完成释放锁(del)}
其他客户端等待重试
/**
* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
*/
@Override
public void testLock() {
//0.先尝试获取锁 setnx key val
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放
stringRedisTemplate.delete("lock");
}else{
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
进行测试:运行结果应该是5000。
可能出现的问题:
问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放
解决:设置过期时间,自动释放锁。
将上锁代码改为:
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS);
测试结果应该是5000
问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s。执行流程如下
最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。
修改代码:
@Override
public synchronized void testLock() {
// 声明uuid
String uuid = UUID.randomUUID().toString();
// 将uuid 存储到缓存中
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if (flag){
try {
//1.从Redis缓存中获取key="num"的值 保证redis中存在"num"(手动提前在redis中创建key)
String value = stringRedisTemplate.opsForValue().get("num");
if (StringUtils.isBlank(value)) {
return;
}
//2.对获取到值进行+1操作
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
// 判断uuid 是否与缓存中的数据相等
if (uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){
// 如果相等,删除锁
stringRedisTemplate.delete("lock");
}
} catch (NumberFormatException e) {
e.printStackTrace();
}
}else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
问题:删除操作缺乏原子性。
场景:
index1执行删除前,lock刚好过期时间已到,被Redis自动释放 在Redis中没有了锁。
index2获取了lock,index2线程获取到了cpu的资源,开始执行方法
index1执行删除,此时会把index2的lock删除
index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的是index2的锁!
解决方案:使用lua脚本保证删除锁具有原子性操作
/**
* 采用SpringDataRedis实现分布式锁
* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)
*/
@Override
public void testLock() {
//0.先尝试获取锁 setnx key val
//问题:锁可能存在线程间相互释放
//Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);
//解决:锁值设置为uuid
String uuid = UUID.randomUUID().toString();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
if(flag){
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = stringRedisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));
//4.将锁释放 判断uuid
//问题:删除操作缺乏原子性。
//if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值
// //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除
// stringRedisTemplate.delete("lock");
//}
//解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行
//执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i]) 参数三:lua脚本中需要参数值 ARGV[i]
//4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
//4.2设置脚本文本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
redisScript.setScriptText(script);
//4.3 设置响应类型
redisScript.setResultType(Long.class);
stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
}else{
try {
//睡眠
Thread.sleep(100);
//自旋重试
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结:
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/Redisson/Redisson/wiki
实现步骤:
在service-util
模块中解开redisson相关依赖(注释放开)
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
在service-util
模块中添加配置类
package com.atguigu.tingshu.common.config.redis;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* redisson配置信息
*/
@Data
@Configuration
@ConfigurationProperties("spring.data.redis")
public class RedissonConfig {
private String host;
private String password;
private String port;
private int timeout = 3000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
if (StringUtils.isBlank(host)) {
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout);
if (StringUtils.isNotBlank(this.password)) {
serverConfig.setPassword(this.password);
}
return Redisson.create(config);
}
}
优化代码
package com.atguigu.tingshu.album.service.impl;
import com.alibaba.cloud.commons.lang.StringUtils;
import com.atguigu.tingshu.album.service.TestService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* @author atguigu
* @ClassName TestServiceImpl
* @description: TODO
* @date 2023年08月22日
* @version: 1.0
*/
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public void testLock() {
try {
//先尝试获取分布式锁 1.创建锁对象 2.调用获取锁方法
RLock lock = redissonClient.getLock("lock");
//获取锁成功后,才执行业务
lock.lock(); //阻塞到获取锁成功为止
//lock.tryLock(5, TimeUnit.SECONDS); //参数1:等待锁获取最大时间 参数2:时间单位 获取锁失败返回false
//lock.tryLock(3, 5, TimeUnit.SECONDS);//参数1:等待锁获取最大时间 参数2:锁过期时间,参数3:时间单位 获取锁失败返回false
try {
//1.从Redis缓存中获取key="num"的值 保证redis中存在"num"(手动提前在redis中创建key)
String value = redisTemplate.opsForValue().get("num");
if (StringUtils.isBlank(value)) {
return;
}
//2.对获取到值进行+1操作
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
} finally {
//业务执行完毕释放锁
lock.unlock();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
分布式锁:
ab -n 5000 -c 100 http://192.168.200.1:8500/api/album/test/testLock
@Autowired
private RedisTemplate redisTemplate;
/***
* 根据专辑ID查询专辑信息包含专辑标签列表
* 使用自定义分布式锁:解决缓存击穿
* @param id 专辑ID
* @return
*/
@Override
public AlbumInfo getAlbumInfo(Long id) {
try {
//1.优先从缓存中获取数据
//1.1 构建业务数据key 形式:前缀+主键ID
String dataKey = RedisConstant.ALBUM_INFO_PREFIX + id;
//1.2 从Redis中查询业务数据
AlbumInfo albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
//1.3 命中缓存则直接返回即可,未命中则执行获取分布式锁逻辑
if (albumInfo != null) {
return albumInfo;
}
//2.尝试获取分布式锁
//2.1 构建锁Key 形式:前缀+主键ID+后缀
String lockKey = RedisConstant.ALBUM_INFO_PREFIX + id + RedisConstant.CACHE_LOCK_SUFFIX;
//2.2 构建锁Val UUID
String lockVal = IdUtil.fastSimpleUUID();
//2.3 采用set EX NX 实现分布式锁获取
Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, lockVal, RedisConstant.ALBUM_LOCK_EXPIRE_PX2, TimeUnit.SECONDS);
if (flag) {
try {
//2.3 如果获取锁成功则执行业务
//3.执行业务(查询数据库、业务数据放入缓存)
albumInfo = this.getAlbumInfoFromDB(id);
//3.1 查询数据库为空:将空结果放入Redis 暂存10分钟
//3.2 查询数据库有值:将业务数据放入Redis 存1小时
int randomTTL = RandomUtil.randomInt(200, 1000);
long ttl = albumInfo == null ? RedisConstant.ALBUM_TEMPORARY_TIMEOUT : RedisConstant.ALBUM_TIMEOUT + randomTTL;
redisTemplate.opsForValue().set(dataKey, albumInfo, ttl, TimeUnit.SECONDS);
return albumInfo;
} finally {
//4.释放分布式锁
//4.1 提供释放锁lua脚本
String text = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(text);
redisScript.setResultType(Long.class);
//4.2 执行脚本
redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockVal);
}
} else {
//2.4 如果获取锁失败:自旋
Thread.sleep(200);
return this.getAlbumInfo(id);
}
} catch (Exception e) {
//兜底处理:如果Redis锁服务不可用(Redis宕机了) ,直接查询数据库
log.error("[专辑服务]获取专辑信息:{}", e);
//throw new RuntimeException(e);
return this.getAlbumInfoFromDB(id);
}
}
/***
* 根据专辑ID查询专辑信息包含专辑标签列表
* @param id 专辑ID
* @return
*/
@Override
public AlbumInfo getAlbumInfoFromDB(Long id) {
//1.根据主键查询专辑信息
AlbumInfo albumInfo = albumInfoMapper.selectById(id);
//2.根据专辑ID查询专辑标签列表
if (albumInfo != null) {
LambdaQueryWrapper<AlbumAttributeValue> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(AlbumAttributeValue::getAlbumId, id);
List<AlbumAttributeValue> albumAttributeValueList = albumAttributeValueMapper.selectList(queryWrapper);
albumInfo.setAlbumAttributeValueVoList(albumAttributeValueList);
}
return albumInfo;
}
/***
* 根据专辑ID查询专辑信息包含专辑标签列表
* 使用Redisson提供分布式锁:解决缓存击穿
* @param id 专辑ID
* @return
*/
@Override
public AlbumInfo getAlbumInfo(Long id) {
try {
//1.优先从缓存中获取数据
//1.1 构建业务数据key 形式:前缀+主键ID
String dataKey = RedisConstant.ALBUM_INFO_PREFIX + id;
//1.2 从Redis中查询业务数据
AlbumInfo albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
//1.3 命中缓存则直接返回即可,未命中则执行获取分布式锁逻辑
if (albumInfo != null) {
return albumInfo;
}
//2.获取分布式锁
//2.1 声明锁名称 锁粒度设置小=为不同专辑设置不同锁
String lockKey = RedisConstant.ALBUM_INFO_PREFIX + id + RedisConstant.CACHE_LOCK_SUFFIX;
//2.2 基于RedissonClient创建锁对象 入参锁名称就是存入Redis中锁Hash的key
RLock lock = redissonClient.getLock(lockKey);
//2.3 调用获取分布式锁方法-如果需要看门狗lock() 入参中leaseTime设置-1
lock.lock();
//3.执行业务
try {
//3.1 再次查询一次缓存:如果有大量线程已经调用lock方法,这些调用了lock方法线程最终都会获取锁成功,避免获取锁成功线程重复执行查询业务
albumInfo = (AlbumInfo) redisTemplate.opsForValue().get(dataKey);
if (albumInfo != null) {
return albumInfo;
}
//3.2 执行查询数据库业务数据
albumInfo = this.getAlbumInfoFromDB(id);
//3.3 将查询结果放入缓存
int randomTTL = RandomUtil.randomInt(200, 1000);
long ttl = albumInfo == null ? RedisConstant.ALBUM_TEMPORARY_TIMEOUT : RedisConstant.ALBUM_TIMEOUT + randomTTL;
redisTemplate.opsForValue().set(dataKey, albumInfo, ttl, TimeUnit.SECONDS);
return albumInfo;
} finally {
//4.释放分布式锁
lock.unlock();
}
} catch (Exception e) {
//兜底处理:如果Redis锁服务不可用(Redis宕机了) ,直接查询数据库
log.error("[专辑服务]获取专辑信息:{}", e);
//throw new RuntimeException(e);
return this.getAlbumInfoFromDB(id);
}
}
专辑详情中多项数据接口需要查询缓存,那么分布式锁的业务逻辑代码就会出现大量的重复。因此,我们可以参考Spring框架事务注解来实现简化代码。只要类上添加了一个注解,那么这个注解就会自带分布式锁的功能。
实现如下:
service-util
模块中添加一个注解package com.atguigu.tingshu.common.cache;
import java.lang.annotation.*;
/**
* 自定义缓存注解:1.缓存业务数据 2.避免缓存击穿
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface GuiGuCache {
/**
* 业务缓存Key前缀
* @return
*/
String prefix() default "data:";
/**
* 业务缓存Key后缀
* @return
*/
String suffix() default ":info";
}
package com.atguigu.tingshu.common.cache;
import cn.hutool.core.util.RandomUtil;
import com.atguigu.tingshu.common.constant.RedisConstant;
import com.fasterxml.jackson.annotation.JacksonAnnotationsInside;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.filefilter.MagicNumberFileFilter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author: atguigu
* @create: 2023-11-27 10:48
*/
@Slf4j
@Aspect
@Component
public class GuiGuCacheAspect {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* 自定义注解环绕通知
*
* @param joinPoint
* @param guiGuCache
* @return
*/
@SneakyThrows
@Around("@annotation(guiGuCache)")
public Object cacheApect(ProceedingJoinPoint joinPoint, GuiGuCache guiGuCache) {
try {
Object object = new Object();
//1.优先从缓存中获取业务数据
//1.1 构建业务数据Key 根据调用目标方法动态拼接业务key 前缀+方法参数+后缀
String param = "none";
Object[] args = joinPoint.getArgs();
if (args != null && args.length > 0) {
param = Arrays.asList(args).stream()
.map(arg -> arg.toString())
.collect(Collectors.joining(":"));
}
String dataKey = guiGuCache.prefix() + param + guiGuCache.suffix();
//1.2 查询缓存中业务数据
object = redisTemplate.opsForValue().get(dataKey);
//1.3 如果有直接返回,如果没有则执行获取分布式锁
if (object != null) {
return object;
}
//2.获取分布式锁
//2.1 构建分布式锁名称 前缀+方法参数+锁后缀
String lockKey = guiGuCache.prefix() + param + RedisConstant.CACHE_LOCK_SUFFIX;
//2.2 基于RedissonClient对象创建锁对象
RLock lock = redissonClient.getLock(lockKey);
//2.3 调用获取分布式锁方法-如果需要看门狗lock() 入参中leaseTime设置-1
lock.lock();
//3.执行目标方法(查询缓存、查询数据库、将查询结果放入缓存)
try {
//3.1 再次查询一次缓存:如果有大量线程已经调用lock方法,这些调用了lock方法线程最终都会获取锁成功,避免获取锁成功线程重复执行查询业务
object = redisTemplate.opsForValue().get(dataKey);
if (object != null) {
return object;
}
//3.2 在执行目标方法(查询数据库方法)
object = joinPoint.proceed();
//3.3 将查库结果放入缓存
int randomTTL = RandomUtil.randomInt(200, 1000);
long ttl = object == null ? RedisConstant.ALBUM_TEMPORARY_TIMEOUT : RedisConstant.ALBUM_TIMEOUT + randomTTL;
redisTemplate.opsForValue().set(dataKey, object, ttl, TimeUnit.SECONDS);
return object;
} finally {
//4.释放锁
lock.unlock();
}
} catch (Throwable e) {
//兜底处理:如果Redis锁服务不可用(Redis宕机了) ,直接查询数据库
log.error("[缓存切面]异常信息:{}", e);
return joinPoint.proceed();
}
}
}
在这个实现类AlbumInfoServiceImpl中添加注解:
/***
* 根据专辑ID查询专辑信息包含专辑标签列表
* @param id 专辑ID
* @return
*/
@Operation(summary = "根据专辑ID查询专辑信息包含专辑标签列表")
@GetMapping("/albumInfo/getAlbumInfo/{id}")
public Result<AlbumInfo> getAlbumInfo(@PathVariable Long id) {
//AlbumInfo albumInfo = albumInfoService.getAlbumInfo(id);
AlbumInfo albumInfo = albumInfoService.getAlbumInfoFromDB(id);
return Result.ok(albumInfo);
}
/***
* 根据专辑ID查询专辑信息包含专辑标签列表
* @param id 专辑ID
* @return
*/
@Override
@GuiGuCache(prefix = RedisConstant.ALBUM_INFO_PREFIX)
public AlbumInfo getAlbumInfoFromDB(Long id) {...}
/**
* 根据专辑ID查询专辑统计信息
*
* @param albumId 专辑ID
* @return
*/
@Override
@GuiGuCache(prefix = "albumStatVo:")
public AlbumStatVo getAlbumStatVo(Long albumId){...}
在BaseCategoryServiceImpl实现类中添加注解:
/**
* 根据三级分类ID查询分类视图对象
*
* @param category3Id
* @return
*/
@Override
@GuiGuCache(prefix = "categoryView:")
public BaseCategoryView getCategoryView(Long category3Id){...}
/**
* 首页中展示指定1级分类下前7个三级分类集合
*
* @param category1Id:一级分类ID
* @return
*/
@Override
@GuiGuCache(prefix = "topBaseCategory3:")
public List<BaseCategory3> getTopBaseCategory3(Long category1Id){...}
/**
* 查询指定一级分类下包含二级分类列表(二级分类下三级分类列表
*
* @param category1Id 1级分类ID
* @return
*/
@Override
@GuiGuCache(prefix = "baseCategoryListByCategory1:")
public JSONObject getBaseCategoryListByCategory1Id(Long category1Id){...}
/**
* 查询所有分类(1、2、3级分类)在一级分类下包含二级,二级分类下包含三级
*
* @return
*/
@Override
@GuiGuCache(prefix = "allCategoryList")
public List<JSONObject> getBaseCategoryList(){...}
布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。二进制大家应该都清楚,存储的数据不是0就是1,默认是0。
主要用于判断一个元素是否在一个集合中,0代表不存在某个数据,1代表存在某个数据。
总结: 判断一个元素一定不存在 或者 可能存在! 存在一定的误判率{通过代码调节}
Bit 数组:
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
---|---|---|---|---|---|---|---|---|
大数据量的时候, 判断一个元素是否在一个集合中。解决缓存穿透问题
存入过程
布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下:
通过K个哈希函数计算该数据,返回K个计算出的hash值
这些K个hash值映射到对应的K个二进制的数组下标
将K个下标对应的二进制数据改成1。
如图所示:
查询过程
布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下:
1、通过K个哈希函数计算该数据,对应计算出的K个hash值
2、通过hash值找到对应的二进制的数组下标
3、判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据可能存在集合中。
优点
缺点
添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。
例如图中的“张三”和“张三丰”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。
这个时候,你就不知道下标为1的二进制,到底是代表“张三”还是“张三丰”。
一、存在误判
假如上面的图没有存 "张三",只存了 "张三丰",那么用"张三"来查询的时候,会判断"张三"存在集合中。
因为“张三”和“张三丰”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。
误判率:
受三个因素影响: 二进制位的个数m, 哈希函数的个数k, 数据规模n (添加到布隆过滤器中的函数)
已知误判率p, 数据规模n, 求二进制的个数m,哈希函数的个数k {m,k 程序会自动计算 ,你只需要告诉我数据规模,误判率就可以了}
ln: 自然对数是以常数e为底数的对数,记作lnN(N>0)。在物理学,生物学等自然科学中有重要的意义,一般表示方法为lnx。数学中也常见以logx表示自然对数。
二、删除困难
还是用上面的举例,因为“张三”和“张三丰”的hash值相同,对应的数组下标也是一样的。
如果你想去删除“张三”,将下标为1里的二进制数据,由1改成了0。
那么你是不是连“张三丰”都一起删了呀。
在service-search
模块的启动类中添加
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ServiceSearchApplication implements CommandLineRunner {
@Autowired
private RedissonClient redissonClient;
public static void main(String[] args) {
SpringApplication.run(ServiceAlbumApplication.class, args);
}
/**
* Springboot应用初始化后会执行一次该方法
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
//初始化布隆过滤器
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
//设置数据规模 误判率 预计统计元素数量为100000,期望误差率为0.01
bloomFilter.tryInit(100000, 0.01);
}
}
service-search
模块中SearchServiceImpl上架专辑方法upperAlbum
@Autowired
private RedissonClient redissonClient;
/**
* 构建专辑索引库文档对象,新增专辑到索引库
*
* @param albumId
*/
@Override
public void upperAlbum(Long albumId) {
//....省略代码
//将新增的商品SKUID存入布隆过滤器
//获取布隆过滤器,将新增skuID存入布隆过滤器
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
bloomFilter.add(albumInfo.getId());
}
service-search
模块中ItemServiceImpl汇总专辑方法getAlbumItem()
@Autowired
private RedissonClient redissonClient;
/**
* 汇总专辑详情相关信息
*
* @param albumId
* @return
*/
@Override
public Map<String, Object> getAlbumItem(Long albumId) {
RBloomFilter<Long> bloomFilter = redissonClient.getBloomFilter(RedisConstant.ALBUM_BLOOM_FILTER);
boolean contains = bloomFilter.contains(albumId);
if(!contains){
throw new GuiguException(ResultCodeEnum.DATA_NOT_FOUNT);
}
//.....省略代码
}