1、批量操作
方案设计
-
基本功能实现
-
java">/** * 批量添加题目和题库关联 * * @param questionIdList 题目id列表 * @param questionBankId 题库id * @param loginUser 登录用户 */ @Override @Transactional(rollbackFor = Exception.class) public void batchAddQuestionBankQuestion(List<Long> questionIdList, Long questionBankId, User loginUser) { //参数校验 ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表不能为空"); ThrowUtils.throwIf(questionBankId <=0, ErrorCode.PARAMS_ERROR, "题目列表不能为空"); ThrowUtils.throwIf(loginUser == null, ErrorCode.PARAMS_ERROR, "用户未登录"); //检查题目id是否存在 List<Question> questions = questionService.listByIds(questionIdList); List<Long> QuestionIdList = questions.stream() .map(Question::getId) .collect(Collectors.toList()); ThrowUtils.throwIf(CollUtil.isEmpty(QuestionIdList), ErrorCode.PARAMS_ERROR, "题目不存在"); //检查题库id是否存在 QuestionBank questionBank = questionBankService.getById(questionBankId); ThrowUtils.throwIf(questionBank == null, ErrorCode.PARAMS_ERROR, "题库不存在"); //批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法 //todo 使用批量插入方法 for (Long questionId : QuestionIdList) { QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion(); questionBankQuestion.setQuestionBankId(questionBankId); questionBankQuestion.setQuestionId(questionId); questionBankQuestion.setUserId(loginUser.getId()); boolean result = this.save(questionBankQuestion); ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "添加失败"); } }
批量操作优化
健壮性
-
对代码进行更详细的异常处理
-
对代码进行更多的参数校验
-
java">/** * 批量添加题目和题库关联 * * @param questionIdList 题目id列表 * @param questionBankId 题库id * @param loginUser 登录用户 */ @Override @Transactional(rollbackFor = Exception.class) public void batchAddQuestionBankQuestion(List<Long> questionIdList, Long questionBankId, User loginUser) { //参数校验 ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表不能为空"); ThrowUtils.throwIf(questionBankId <=0, ErrorCode.PARAMS_ERROR, "题目列表不能为空"); ThrowUtils.throwIf(loginUser == null, ErrorCode.PARAMS_ERROR, "用户未登录"); //检查题目id是否存在 List<Question> questions = questionService.listByIds(questionIdList); List<Long> questionIdLongList = questions.stream() .map(Question::getId) .collect(Collectors.toList()); ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目不存在"); //获取合法的题目id列表 List<Long> questionIdLongList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long)obj); ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目不存在"); //检查题库id是否存在 QuestionBank questionBank = questionBankService.getById(questionBankId); ThrowUtils.throwIf(questionBank == null, ErrorCode.PARAMS_ERROR, "题库不存在"); //检查题目还不存在与题库中,过滤已存在题库中的题目 LambdaQueryWrapper<QuestionBankQuestion> lambdaQueryWrapper = Wrappers.lambdaQuery(QuestionBankQuestion.class) .eq(QuestionBankQuestion::getQuestionBankId, questionBankId) .in(QuestionBankQuestion::getQuestionId, questionIdLongList); List<QuestionBankQuestion> existQuestionList = this.list(lambdaQueryWrapper); //已存在题库中的题目 Set<Long> questionIdLongSet = existQuestionList.stream() .map(QuestionBankQuestion::getQuestionId) .collect(Collectors.toSet()); //过滤掉已存在题库中的题目 questionIdLongList = questionIdLongList.stream().filter(questionId -> { return !questionIdLongSet.contains(questionId); }).collect(Collectors.toList()); ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目都存在与题库中"); //批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法 //todo 使用批量插入方法 for (Long questionId : questionIdLongList) { QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion(); questionBankQuestion.setQuestionBankId(questionBankId); questionBankQuestion.setQuestionId(questionId); questionBankQuestion.setUserId(loginUser.getId()); try { boolean result = this.save(questionBankQuestion); if (!result) { throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败"); } } catch (DataIntegrityViolationException e) { log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加"); } catch (DataAccessException e) { log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败"); } catch (Exception e) { // 捕获其他异常,做通用处理 log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败"); } }
-
稳定性
避免长事务问题
-
java">//批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法 //todo 使用批量插入方法 //分批次插入题库题目关联数据 final int batchSize = 1000; int questionIdLongListSize = questionIdLongList.size(); for (int i = 0; i < questionIdLongListSize; i ++) { //生成当前批次的题目id列表 List<Long> currentQuestionIdList = questionIdLongList.subList(i, Math.min(i + batchSize, questionIdLongListSize)); List<QuestionBankQuestion> questionBankQuestions = currentQuestionIdList.stream() .map(questionId -> { QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion(); questionBankQuestion.setQuestionBankId(questionBankId); questionBankQuestion.setQuestionId(questionId); questionBankQuestion.setUserId(loginUser.getId()); return questionBankQuestion; }).collect(Collectors.toList()); } //使用事务管理每个批次的插入操作 //获取事务代理,防止事务失效 QuestionBankQuestionServiceImpl questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy(); questionBankQuestionService.batchAddQuestionBankQuestionInner(notExistQuestionList);
-
java">/** * 批量添加题目和题库关联 内部方法,不对外暴露 * @param questionBankQuestionLists 题库题目关联列表 */ @Transactional(rollbackFor = Exception.class) @Override public void batchAddQuestionBankQuestionInner(List<QuestionBankQuestion> questionBankQuestionLists){ for (QuestionBankQuestion questionBankQuestion : questionBankQuestionLists) { Long questionId = questionBankQuestion.getQuestionId(); Long questionBankId = questionBankQuestion.getQuestionBankId(); try { boolean result = this.save(questionBankQuestion); if (!result) { throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败"); } } catch (DataIntegrityViolationException e) { log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加"); } catch (DataAccessException e) { log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败"); } catch (Exception e) { // 捕获其他异常,做通用处理 log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}", questionId, questionBankId, e.getMessage()); throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败"); } } }
性能
使用批量插入数据
-
java">boolean result = this.saveBatch(questionBankQuestionLists);
sql优化
-
java">//检查题目id是否存在,使用了select* List<Question> questions = questionService.listByIds(questionIdList); //优化 LambdaQueryWrapper<Question> questionLambdaQueryWrapper = Wrappers.lambdaQuery(Question.class) .select(Question::getId) .in(Question::getId, questionIdList); List<Question> questions = questionService.list(questionLambdaQueryWrapper);
-
java">//因为我们只要id列表,直接将查询到的数据转换为id列表 List<Long> questionIdLongList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long)obj);
并发编程
CompletableFuture 是Java8中引入的一个类,用于表示异步操作的结果。它是 Future 的增强版本,不仅可以表示一个
异步计算,还可以对异步计算的结果进行组合、转换和处理,实现异步任务的编排。?但是要注意,CompletableFuture 默认使用的是 ForkJoinPool.commonpool()方法得到的线程池,这是一个全局共享
的线程池,如果有多种不同的任务都依赖该线程池进行处理,可能会导致资源争抢、代码阻塞等不确定的问题。所以建议
针对每种任务,自定义线程池来处理,实现线程池资源的隔离。如何分配线程池参数
- 对于计算密集型任务(消耗 CPU 资源),设置核心线程数为 n+1 或者 n(n是 CPU 核心数),可以充分利用 CPU.
多一个线程是为了可以在某些线程短暂阻塞或执行调度时,确保有足够的线程保持 CPU 繁忙,最大化 CPU 的利用
率。- 对于 I/O 密集型任务(消耗 I/O资源),可以增大核心线程数为 CPU 核心数的2-4倍,可以提升并发执行任务的数
重。
-
批处理时是串行的,可以使用并发编程,使其并行,提升运行速度
-
对于数据库的批量插入业务,是属于IO密集型的任务
java">//自定义线程池 //自定义线程池,使用线程池执行插入操作 ThreadPoolExecutor customExecutor = new ThreadPoolExecutor( 20,//核心线程数 40,//最大线程数 60L,//线程存活时间为60秒 TimeUnit.SECONDS,//时间单位 new ArrayBlockingQueue<>(10000),//阻塞队列大小 new ThreadPoolExecutor.DiscardPolicy() //拒绝策略,由调用线程处理 ); List<CompletableFuture<Void>> futures = new ArrayList<>(); //分批次插入题库题目关联数据 final int batchSize = 1000; int questionIdLongListSize = questionIdLongList.size(); for (int i = 0; i < questionIdLongListSize; i ++) { //生成当前批次的题目id列表 List<Long> currentQuestionIdList = questionIdLongList.subList(i, Math.min(i + batchSize, questionIdLongListSize)); List<QuestionBankQuestion> questionBankQuestions = currentQuestionIdList.stream() .map(questionId -> { QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion(); questionBankQuestion.setQuestionBankId(questionBankId); questionBankQuestion.setQuestionId(questionId); questionBankQuestion.setUserId(loginUser.getId()); return questionBankQuestion; }).collect(Collectors.toList()); //使用事务管理每个批次的插入操作 //获取事务代理,防止事务失效 QuestionBankQuestionServiceImpl questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy(); //异步执行插入操作,生成异步任务 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { questionBankQuestionService.batchAddQuestionBankQuestionInner(questionBankQuestions); }, customExecutor).exceptionally(ex -> { log.error("批处理插入失败", ex); return null; }); //添加异步任务到列表 futures.add(future); } //等待所有异步任务完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); //关闭线程池 customExecutor.shutdown(); }
数据库连接池优化
-
数据库连接池是用于管理与数据库之间连接的资源池,它能够 复用 现有的数据库连接,而不是在每次请求时都新建和销毁连接,从而提升系统的性能和响应速度。
-
java"><!-- druid数据库连接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.23</version> </dependency>
-
配置druid
-
spring: # 数据源配置 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/interview_bank username: root password: zzj0806 # 指定数据源类型 type: com.alibaba.druid.pool.DruidDataSource # Druid 配置 druid: # 配置初始化大小、最小、最大 initial-size: 10 minIdle: 10 max-active: 10 # 配置获取连接等待超时的时间(单位:毫秒) max-wait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 time-between-eviction-runs-millis: 2000 # 配置一个连接在池中最小生存的时间,单位是毫秒 min-evictable-idle-time-millis: 600000 max-evictable-idle-time-millis: 900000 # 用来测试连接是否可用的SQL语句,默认值每种数据库都不相同,这是mysql validationQuery: select 1 # 应用向连接池申请连接,并且testOnBorrow为false时,连接池将会判断连接是否处于空闲状态,如果是,则验证这条连接是否可用 testWhileIdle: true # 如果为true,默认是false,应用向连接池申请连接时,连接池会判断这条连接是否是可用的 testOnBorrow: false # 如果为true(默认false),当应用使用完连接,连接池回收连接的时候会判断该连接是否还可用 testOnReturn: false # 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle poolPreparedStatements: true # 要启用PSCache,必须配置大于0,当大于0时, poolPreparedStatements自动触发修改为true, # 在Druid中,不会存在Oracle下PSCache占用内存过多的问题, # 可以把这个数值配置大一些,比如说100 maxOpenPreparedStatements: 20 # 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作 keepAlive: true # Spring 监控,利用aop 对指定接口的执行时间,jdbc数进行记录 aop-patterns: "com.springboot.template.dao.*" ########### 启用内置过滤器(第一个 stat 必须,否则监控不到SQL)########## filters: stat,wall,log4j2 # 自己配置监控统计拦截的filter filter: # 开启druiddatasource的状态监控 stat: enabled: true db-type: mysql # 开启慢sql监控,超过2s 就认为是慢sql,记录到日志中 log-slow-sql: true slow-sql-millis: 2000 # 日志监控,使用slf4j 进行日志输出 slf4j: enabled: true statement-log-error-enabled: true statement-create-after-log-enabled: false statement-close-after-log-enabled: false result-set-open-after-log-enabled: false result-set-close-after-log-enabled: false ########## 配置WebStatFilter,用于采集web关联监控的数据 ########## web-stat-filter: enabled: true # 启动 StatFilter url-pattern: /* # 过滤所有url exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # 排除一些不必要的url session-stat-enable: true # 开启session统计功能 session-stat-max-count: 1000 # session的最大个数,默认100 ########## 配置StatViewServlet(监控页面),用于展示Druid的统计信息 ########## stat-view-servlet: enabled: true # 启用StatViewServlet url-pattern: /druid/* # 访问内置监控页面的路径,内置监控页面的首页是/druid/index.html reset-enable: false # 不允许清空统计数据,重新计算 login-username: root # 配置监控页面访问密码 login-password: 123 allow: 127.0.0.1 # 允许访问的地址,如果allow没有配置或者为空,则允许所有访问 deny: # 拒绝访问的地址,deny优先于allow,如果在deny列表中,就算在allow列表中,也会被拒绝
-
-
数据一致性
添加事务
使用分布式锁
- 使用Redis和redisson分布式锁实现
使用乐观锁
- 简称CAS,给数据添加版本号。
可观测性
日志记录
监控
- 例如druid监控等等
返回值优化
-
定义一个返回值对象,包含失败的原因,帮助定位失败的信息
-
java">public class BatchAddResult { private int total; private int successCount; private int failureCount; private List<String> failureReasons; }
-
-
示例代码
-
java">public BatchAddResult batchAddQuestionsToBank(List<Long> questionIdList, Long questionBankId, User loginUser) { BatchAddResult result = new BatchAddResult(); result.setTotal(questionIdList.size()); // 执行批量插入逻辑 for (Long questionId : questionIdList) { try { // 插入操作 saveQuestionToBank(questionId, questionBankId, loginUser); result.setSuccessCount(result.getSuccessCount() + 1); } catch (Exception e) { result.setFailureCount(result.getFailureCount() + 1); result.getFailureReasons().add("题目ID " + questionId + " 插入失败:" + e.getMessage()); } } return result; // 返回批量处理的结果 }
-