学习目标:
我们项目选择:XXL-JOB
官方文档:https://www.xuxueli.com/xxl-job/
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
源码仓库地址 | Release Download |
---|---|
https://github.com/xuxueli/xxl-job | Download |
http://gitee.com/xuxueli0323/xxl-job | Download |
当前项目使用版本:2.4.1-SNAPSHOT
注:为了统一版本,已统一下载,在资料中获取:xxl-job-master.zip
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${最新稳定版本}</version>
</dependency>
解压:xxl-job-master.zip,导入idea,或在idea里从https://gitee.com/xuxueli0323/xxl-job.git克隆到本地。如图:
项目结构说明:
xxl-job-master:
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
xxl-job-executor-sample-frameless:无框架版本;
获取 “调度数据库初始化SQL脚本” 并执行即可。
调度数据库初始化SQL脚本” 位置为:
/xxl-job-master/doc/db/tables_xxl_job.sql
调度中心项目:xxl-job-admin
作用:统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台。
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
调度中心访问地址:http://localhost:8080/xxl-job-admin
默认登录账号 “admin/123456”, 登录后运行界面如下图所示:
调度中心支持集群部署,提升调度系统容灾和可用性。
调度中心集群部署时,几点要求和建议:
“执行器”项目:xxl-job-executor-sample-springboot (提供多种版本执行器供选择,现以 springboot 版本为例,可直接使用,也可以参考其并将现有项目改造成执行器)
作用:负责接收“调度中心”的调度并执行;可直接部署执行器,也可以将执行器集成到现有业务项目中。
确认pom文件中引入了 “xxl-job-core” 的maven依赖;
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.1-SNAPSHOT</version>
</dependency>
执行器配置,配置内容说明:
# web port
server.port=8081
# no web
#spring.main.web-environment=false
# log config
logging.config=classpath:logback.xml
### TODO:修改连接注册调度中心地址 xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### TODO:用于执行器注册授权token跟调度中心配置一致 xxl-job, access token
xxl.job.accessToken=default_token
### TODO:执行器名称,用于注册及集群 xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
# TODO:执行器netty服务端端口,避免本地集群启动冲突
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
执行器组件,配置内容说明:
package com.xxl.job.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
启动:xxl-job-executor-sample-springboot
执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。
执行器集群部署时,几点要求和建议:
点击进入”执行器管理”界面, 如下图:
上面我们启动了xxl-job-executor-sample-springboot 执行器项目,当前已注册上来,我们执行使用改执行器。
执行器属性说明:
AppName: 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;
名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式:调度中心获取执行器地址的方式;
自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;
登录调度中心:http://localhost:8080/xxl-job-admin
默认登录账号 “admin/123456”
任务管理 ==》 新增
添加成功,如图:
使用xxl-job-executor-sample-springboot项目job实例,与步骤二的JobHandler配置一致
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
任务列表状态改变,如图:
设置断点,执行结果:
查看调度日志:
我们使用单独的一个微服务模块service-dispatch集成XXL-JOB执行器
该过程与2.4.4 配置部署“执行器项目类似
已引入,就忽略
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
注:当前远程maven仓库只更新到2.4.0,也可以把上面项目包安装到本地仓库,对于当前项目使用这两个版本无差异
xxl:
job:
admin:
# 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册
addresses: http://localhost:8080/xxl-job-admin
# 执行器通讯TOKEN [选填]:非空时启用
accessToken: default_token
executor:
# 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-sample
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
直接放入application.properties 配置文件中即可
# log config
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=
xxl.job.executor.appname=dispatch-executor
xxl.job.executor.address=
xxl.job.executor.ip=
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30
注:如果已配置,忽略
将xxl-job-executor-sample-springboot 执行器项目的XxlJobConfig类复制过来
package com.atguigu.tingshu.dispatch.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
到处,我们已经将XXL-JOB集成到项目中了
编写测试任务
package com.atguigu.tingshu.dispatch.job;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DispatchJobHandler {
@XxlJob("firstJobHandler")
public void firstJobHandler() {
log.info("xxl-job项目集成测试");
}
}
在调度中心配置任务
启动任务,测试
前面我们讲过排行榜,为了减轻服务器压力,排行榜的数据要求不需要实时,我们可以1个小时或者1天更新一次排行榜都可以,当前以1个小时更新一次排行榜为例。
前面在service-search模块已提供更新排行榜接口,在此直接写远程调用的feign接口即可
操作模块:service-search-client
远程调用Feign接口
package com.atguigu.tingshu.search.client;
import com.atguigu.tingshu.common.result.Result;
import com.atguigu.tingshu.search.client.impl.SearchDegradeFeignClient;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
/**
* <p>
* 搜索模块远程调用API接口
* </p>
*
* @author atguigu
*/
@FeignClient(value = "service-search", path = "api/search",fallback = SearchDegradeFeignClient.class)
public interface SearchFeignClient {
/**
* 为定时更新首页排行榜提供调用接口
* @return
*/
@GetMapping("/albumInfo/updateLatelyAlbumRanking")
public Result updateLatelyAlbumRanking();
}
远程调用服务降级容错类
package com.atguigu.tingshu.search.client.impl;
import com.atguigu.tingshu.common.result.Result;
import com.atguigu.tingshu.search.client.SearchFeignClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author: atguigu
* @create: 2023-12-05 22:23
*/
@Slf4j
@Component
public class SearchDegradeFeignClient implements SearchFeignClient {
@Override
public Result updateLatelyAlbumRanking() {
log.error("[搜索服务]执行了服务降级方法:updateLatelyAlbumRanking");
return null;
}
}
在DispatchJobHandler类编写任务
package com.atguigu.tingshu.dispatch.job;
import com.atguigu.tingshu.search.client.SearchFeignClient;
import com.atguigu.tingshu.user.client.UserFeignClient;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DispatchHandler {
@Autowired
private SearchFeignClient searchFeignClient;
/**
* 入门案例:任务
*/
@XxlJob("myJob")
public void task1() {
//获取任务参数
String jobParam = XxlJobHelper.getJobParam();
log.info("task1....{}", jobParam);
}
/**
* 定时任务:远程调用搜索服务更新专辑热门排行榜
*
* @return
*/
@XxlJob("updateAlbumRanking")
public ReturnT updateAlbumRanking() {
try {
log.info("正在执行更新排行榜-远程调用搜索服务更新专辑热门排行榜");
searchFeignClient.updateLatelyAlbumRanking();
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("[任务服务]时任务更新排行榜异常:{}", e);
return ReturnT.FAIL;
}
}
}
每1小时执行1次的Cron表达式:0 0 0/1 * * ?
JobHandler:updateLatelyAlbumRankingJob
购买VIP会员服务,到期后我们要更新状态,前面判断VIP服务的时候,我们不仅判断了vip状态值,还判断了VIP失效时间,这样即使vip过期了后状态还没改变,通过vip过期时间它也不能享受VIP服务了,为了减轻服务器压力,每天凌晨更新前一天到期的vip会员状态
定义controller接口
/**
* 更新VIP状态:处理过期会员
* @return
*/
@Operation(summary = "更新VIP状态:处理过期会员")
@GetMapping("/updateVipExpireStatus")
public Result updateVipExpireStatus(){
userInfoService.updateVipExpireStatus(new Date());
return Result.ok();
}
定义service接口
/**
* 更新VIP状态:处理过期会员
* @return
*/
void updateVipExpireStatus(Date date);
接口实现
/**
* 更新VIP状态:处理过期会员
* @return
*/
@Override
public void updateVipExpireStatus(Date date) {
userInfoMapper.updateVipExpireStatus(date);
}
UserInfoMapper
package com.atguigu.tingshu.user.mapper;
import com.atguigu.tingshu.model.user.UserInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
@Mapper
public interface UserInfoMapper extends BaseMapper<UserInfo> {
/**
* 更新VIP状态:处理过期会员
*/
void updateVipExpireStatus(@Param("date") Date date);
}
UserInfoMapper.xml
<update id="updateVipExpireStatus">
UPDATE user_info set is_vip = 0 where is_deleted =0 and vip_expire_time < #{date}
</update>
远程调用Feign接口
/**
* 更新VIP状态:处理过期会员
* @return
*/
@GetMapping("/updateVipExpireStatus")
public Result updateVipExpireStatus();
远程调用服务降级容错类
@Override
public Result updateVipExpireStatus() {
log.error("[用户服务]提供远程调用方法updateVipExpireStatus执行服务降级");
return null;
}
在DispatchJobHandler类编写任务
package com.atguigu.tingshu.dispatch.job;
import com.atguigu.tingshu.search.client.SearchFeignClient;
import com.atguigu.tingshu.user.client.UserFeignClient;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DispatchHandler {
@Autowired
private UserFeignClient userFeignClient;
/**
* 定时任务:远程调用用户服务更新会员状态
*
* @return
*/
@XxlJob("updateVipExpireStatus")
public ReturnT updateVipExpireStatus() {
try {
log.info("正在执行更新会员过期-远程调用用户服务更新会员状态");
userFeignClient.updateVipExpireStatus();
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("[任务服务]定时任务更新VIP异常:{}", e);
return ReturnT.FAIL;
}
}
}
配置方式如上,重要参数如下:
每天0点执行1次的Cron表达式:0 0 0 * * ?
JobHandler:updateVipExpireStatusJob
配置方式如上,重要参数如下:
每1小时执行1次的Cron表达式:0 0 0/1 * * ?
JobHandler:updateLatelyAlbumStatJob
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务(如何避免任务被重复执行),同时系统自动传递分片参数;可根据分片参数开发分片任务;
“分片广播” 以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数,获取分片参数进行分片业务处理。
例如:银行每月21号进行扣款,21号之前进行对贷款客户,进行扣费前提醒。从17号-21号期间对30W贷款用户进行短信通知。
设计待处理任务表:30W条记录
id name mobile money status(0未通知,1已通知)
分片参数(调度中心动态传入)决定持久层查询当前执行器执行任务。将总分片数据+分片索引传入持久层
# 3个执行器-总分片数3 查询条件 用户ID%总分片数=当前分片索引
#模拟第一个执行器处理用户
# 1.查询10条当前执行器处理未发送短信用户列表
SELECT * from user_task where `status` = 0 and id % 3 = 0 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
#模拟第二个执行器处理用户
SELECT * from user_task where `status` = 0 and id % 3 = 1 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
#模拟第三个执行器处理用户
SELECT * from user_task where `status` = 0 and id % 3 = 2 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
# 任务执行效率低,4个执行器-总分片数3 查询条件 用户ID%总分片数=当前分片索引
#模拟第一个执行器处理用户
# 1.查询10条当前执行器处理未发送短信用户列表
SELECT * from user_task where `status` = 0 and id % 4 = 0 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
#模拟第二个执行器处理用户
SELECT * from user_task where `status` = 0 and id % 4 = 1 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
#模拟第三个执行器处理用户
SELECT * from user_task where `status` = 0 and id % 4 = 2 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”
#模拟第三个执行器处理用户
SELECT * from user_task where `status` = 0 and id % 4 = 3 limit 2;
# 2.给10个用于发送短信
# 3.将任务状态改为“1”