irpas技术客

springboot集成xxl-job定时任务实现mongodb日志表定期归_小米吃辣椒2022

网络投稿 6314

????????

目录

一、归档配置表设计

二、手动创建线程池工具类 和 MongodbArchiveService 类

三、xxl-job日志归档定时任务入口

四、日志归档处理子类ArchivePrinterLogService


? ? ? ?在项目开发亦或是接口调用,异常记录,历史信息记录都难免不了日志的记录,随着时间的推移,日志表中的数据会变的越来越多,日志定期归档就显得尤为重要。

一、归档配置表设计

为了能够适应需求的变化,归档日期和日志的失效时间可能随时会有变更,我们需要设计一个归档配置表,用来存储日志归档周期,失效时间,以及日志处理类名全路径等信息(这里全路径信息可以利用反射方式获得具体的实现类调用归档接口)

t_archive_rule 归档配置表

code :归档类型,name:归档子类全路径名称 ,table_name:需要归档的日志表名,

archive_day:归档距离当前时间n天之前的数据,archive_type:归档数据失效时间(超过90天自动删除)

?二、手动创建线程池工具类 和 MongodbArchiveService 类

基础实体类

package com.purcotton.wms.recording.domain; import com.purcotton.wms.general.domian.BaseDocEntity; import lombok.*; import org.springframework.data.mongodb.core.index.IndexDirection; import org.springframework.data.mongodb.core.index.Indexed; import org.springframework.data.mongodb.core.mapping.Document; import java.util.Date; /** * 打印日志对象 * * @author : lovezi0 2021/7/23 13:59 * @version : 1.0 **/ @EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor @NoArgsConstructor @Builder @Document(collection = "archive_printer_log") public class ArchivePrinterLogDoc extends BaseDocEntity { /** * 打印参数 */ private String params; /** * 报表代码 */ private String reportCode; /** * 打印数据 */ private String body; /** * 错误 */ private String error; /** * 仓库编码 */ private String whCode; /** * 已打印的次数 */ private Integer alreadyPrintCount; /** * 打印时间 */ private String printTime; /** * 过期时间 */ @Indexed(direction = IndexDirection.DESCENDING,expireAfterSeconds = 90*24*60*60) private Date expreDate; }

线程池工具类?

package com.purcotton.wms.recording.util; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author 500007 * @ClassName: * @Description: * @date 2021年12月21日 14:44:46 */ public class ThreadPoolUtil { /** * 创建大任务线程池,每个日志表归档任务开启一个线程 * @param poolName * @return */ public static ThreadPoolExecutor generateSchTaskThreadPoolExecutor(String poolName) { return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 2 * Runtime.getRuntime().availableProcessors() + 1, 10, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(10), new DefaultThreadFactory(poolName), new ThreadPoolExecutor.CallerRunsPolicy()); } /** * 创建小任务线程池,每页数据需要一个线程处理,线程池满的情况下,采用最多等待60秒的方式重新入阻塞队列,超过60秒 * 未入队列则抛异常 * @param poolName * @return */ public static ThreadPoolExecutor generateSmallTaskThreadPoolExecutor(String poolName) { return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 2 * Runtime.getRuntime().availableProcessors() + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20000), new DefaultThreadFactory(poolName), new CallerBlocksPolicy(20000)); } }

?MongodbArchiveService类,一些公共方法,创建查询请求体和创建失效时间索引方法

package com.purcotton.wms.recording.service.impl; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse; import com.purcotton.wms.general.factory.WmsExceptionFactory; import com.purcotton.wms.general.utils.DateUtil; import com.purcotton.wms.recording.constants.LogCommConstants; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.index.*; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import java.lang.reflect.Field; import java.time.ZoneOffset; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.purcotton.wms.general.constant.DateConstant.DEFAULT_DATE_TIME; /** * @author 500007 * @ClassName: * @Description: * @date 2021年12月20日 19:59:56 */ @Service @Slf4j public class MongodbArchiveService { @Autowired private MongoTemplate mongoTemplate; /** * 通过mongoTemplate创建失效时间索引 * * @param entityClass * @param collectionName */ public void createIndex(Class entityClass, String collectionName,Integer expreDays) { //如果传入collectionName为空则从类的Document注解获取 if (StringUtils.isBlank(collectionName)) { Document document = (Document) entityClass.getDeclaredAnnotation(Document.class); collectionName = document.collection(); } if (StringUtils.isBlank(collectionName)) { throw WmsExceptionFactory.getException("创建索引失败,collectionName不能为空"); } //获取字段列表 Field[] declaredFields = entityClass.getDeclaredFields(); if (ArrayUtil.isEmpty(declaredFields)) { return; } //获取索引操作器 IndexOperations indexOperations = this.mongoTemplate.indexOps(collectionName); //获取索引列表 List<IndexInfo> indexInfo = indexOperations.getIndexInfo(); //转为索引名称list List<String> indexNameList = indexInfo.stream().map(IndexInfo::getName).collect(Collectors.toList()); for (Field field : declaredFields) { boolean annotationPresent = field.isAnnotationPresent(Indexed.class); if (annotationPresent) { //索引字段名称 String name = field.getName(); Indexed indexed = field.getDeclaredAnnotation(Indexed.class); //索引过期时间 int expireAfterSeconds; if(expreDays!=null){ expireAfterSeconds = expreDays*24*60*60; }else { expireAfterSeconds = indexed.expireAfterSeconds(); } //获取索引排序规则 IndexDirection indexDirection = indexed.direction(); Sort.Direction direction = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? Sort.Direction.DESC : Sort.Direction.ASC; //数字型排序规则 String i = indexDirection.compareTo(IndexDirection.DESCENDING) == 0 ? LogCommConstants.DESC : LogCommConstants.ASC; //判断索引是否已经存在 boolean flag = indexNameList.contains(name + StrUtil.UNDERLINE + i); if (!flag) { Index index = new Index(name, direction); index.expire(expireAfterSeconds, TimeUnit.SECONDS); //创建索引 indexOperations.ensureIndex(index); } } } } /** * 计算归档日期,当前时间-x天 * @param header * @param now * @return */ public Long calcEndStamp(ArchiveHeaderPageResponseFeignResponse header,Date now){ //获取归档日期,归档该日期之前的数据 Integer archiveDate = header.getArchiveDaysBefore(); if (ObjectUtils.isEmpty(archiveDate)) { throw WmsExceptionFactory.getException("打印日志归档时间不能为空"); } Date endDate = DateUtils.addDays(now, -archiveDate); String endTimeStr = DateUtil.toString(endDate, DEFAULT_DATE_TIME); Long endStamp = null; if (StringUtils.isNotBlank(endTimeStr)) { endStamp = DateUtil.parseStringToDateTime(endTimeStr, DEFAULT_DATE_TIME).toInstant(ZoneOffset.of("+8")).toEpochMilli(); } return endStamp; } /** * 构建查询参数 * @param endStamp * @return */ public Query createQuery(Long endStamp) { Query query = new Query(); if (null != endStamp) { query.addCriteria(Criteria.where("createTime").lte(endStamp)); } query.with(Sort.by(Sort.Direction.DESC, "createTime")); return query; } } 三、xxl-job日志归档定时任务入口 package com.purcotton.wms.recording.schedule; import com.purcotton.feign.client.ArchiveHeaderFeignClient; import com.purcotton.feign.vo.request.ArchiveHeaderPageRequestFeignRequest; import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse; import com.purcotton.obj.response.PageResponse; import com.purcotton.obj.response.ResponseData; import com.purcotton.wms.recording.constants.LogCommConstants; import com.purcotton.wms.recording.util.ThreadPoolUtil; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; /** * @author: ww * @date: 2021/12/20 * @desc : */ @Slf4j @Component public class AchivedLogHandler implements ApplicationContextAware { @Autowired private ArchiveHeaderFeignClient archiveHeaderFeignClient; private static ApplicationContext applicationContext; private static CountDownLatch COUNT_DOWN_LATCH; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * 日志记录归档通用方法 * 按配置表配置反射调用具体归档日志方法 */ @XxlJob("achivedLogHandler") public ReturnT<String> achivedLogCommon(String code) { final StopWatch stopWatch = new StopWatch("achivedLogHandler"); log.info("achivedLogHandler:日志归档执行开始,配置归档code:{}", code); stopWatch.start(); ArchiveHeaderPageRequestFeignRequest param = new ArchiveHeaderPageRequestFeignRequest(); param.setPageNum(1); param.setPageSize(999); param.setCode(LogCommConstants.ARCHIVE_HEADER_CODE); ResponseData<PageResponse<ArchiveHeaderPageResponseFeignResponse>> responseData = this.archiveHeaderFeignClient.findPage(param); if (responseData == null || responseData.getData() == null) { log.error("achivedLogHandler_error:【数据归档】归档编码{}不存在对应的归档设置", code); return ReturnT.SUCCESS; } PageResponse<ArchiveHeaderPageResponseFeignResponse> archiveHeader = responseData.getData(); List<ArchiveHeaderPageResponseFeignResponse> headerList = archiveHeader.getList(); if (CollectionUtils.isNotEmpty(headerList)) { ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSchTaskThreadPoolExecutor("achivedLogHandler"); COUNT_DOWN_LATCH = new CountDownLatch(headerList.size()); String tableName = ""; try { for (ArchiveHeaderPageResponseFeignResponse header : headerList) { String className = header.getName(); tableName = header.getTableName(); Class cls = Class.forName(className); Object bean = applicationContext.getBean(cls); Method mothod = bean.getClass().getDeclaredMethod(LogCommConstants.ARCHIVE_LOG_METHOD, ArchiveHeaderPageResponseFeignResponse.class); threadPoolExecutor.execute(() -> { try { mothod.invoke(bean, header); } catch (Exception e) { log.error("achivedLogHandler_error:归档日志线程池任务执行失败,表名称为:" + header.getTableName(), e); } finally { COUNT_DOWN_LATCH.countDown(); } }); } } catch (Exception e) { log.error("achivedLogHandler_error:定时任务归档日志反射执行失败,表名称为:" + tableName, e); } finally { try { COUNT_DOWN_LATCH.await(); threadPoolExecutor.shutdown(); log.info("achivedLogHandler:日志归档任务执行结束,配置归档code:{}", code); } catch (InterruptedException e) { log.error("achivedLogHandler_error:定时任务归档线程池关闭异常", e); } stopWatch.stop(); log.info("achivedLogHandler:日志归档定时任务执行结束,总耗时:{}秒", stopWatch.getTotalTimeSeconds()); } } return ReturnT.SUCCESS; } }

为了提升日志归档的效率,这里采用线程池进行处理,首选根据code查询所需归档的配置列表,for循环遍历获得每个name,根据name从spring上下文中获取归档任务处理子类的全路径名,通过反射调用任务归档方法。

四、日志归档处理子类ArchivePrinterLogService

前面我们配置了6个日志归档配置数据,也就是说我们会有6个对应的日志归档子类分别进行归档逻辑处理,这里我们仅以一个ArchivePrinterLogService代码作为展示。其他子类归档逻辑类似。

package com.purcotton.wms.recording.service.impl; import cn.hutool.core.util.ObjectUtil; import com.purcotton.feign.vo.response.ArchiveHeaderPageResponseFeignResponse; import com.purcotton.wms.general.domian.PrinterLogDoc; import com.purcotton.wms.general.service.impl.BaseMongoService; import com.purcotton.wms.recording.constants.LogCommConstants; import com.purcotton.wms.recording.domain.ArchivePrinterLogDoc; import com.purcotton.wms.recording.domain.ShipmentCheckPackingScaledLogDoc; import com.purcotton.wms.recording.service.AcnhiveLogCommeService; import com.purcotton.wms.recording.util.ThreadPoolUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; /** * @author 500007 * @ClassName: * @Description: 打印日志归档 * @date 2021年12月20日 16:16:20 */ @Service @Slf4j public class ArchivePrinterLogService extends BaseMongoService<ArchivePrinterLogDoc> implements AcnhiveLogCommeService { @Autowired private MongodbArchiveService mongodbArchiveService; @Override public String getCollectionsName() { return "archive_printer_log"; } /** * 实际归档数量 */ private AtomicInteger archiveTotal = null; @Override public void acnhiveLogOperation(ArchiveHeaderPageResponseFeignResponse header) { log.info("achivedLogHandler:{}表开始归档{}天前的日志", getCollectionsName(), header.getArchiveDaysBefore()); StopWatch stopWatch = new StopWatch(getCollectionsName()); stopWatch.start(); long totalCount = 0; archiveTotal = new AtomicInteger(0); try { //失效时间 Integer expreDays = header.getArchiveType(); //分页查询并处理数据 totalCount = this.processLogData(LogCommConstants.ARCHIVE_PAGE_SIZE, header); //如果存在索引,则创建索引 mongodbArchiveService.createIndex(entityClass, getCollectionsName(), expreDays); } catch (Exception e) { log.error("achivedLogHandler_error:打印日志归档失败", e); } finally { stopWatch.stop(); log.info("achivedLogHandler:{}表完成{}天前的日志归档,共计归档{}条数据,耗时:{}秒", getCollectionsName(), header.getArchiveDaysBefore(), totalCount, stopWatch.getTotalTimeSeconds()); } } private long processLogData(int pageSize, ArchiveHeaderPageResponseFeignResponse header) throws InterruptedException { ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.generateSmallTaskThreadPoolExecutor(getCollectionsName()); CountDownLatch countDownLatch = null; long count = 0; try { //获取总数量 final Date now = new Date(); Long endStamp = mongodbArchiveService.calcEndStamp(header, now); log.info("achivedLogHandler:打印日志归档时间戳{}",endStamp); Query query = mongodbArchiveService.createQuery(endStamp); count = mongoTemplate.count(query, PrinterLogDoc.class); if (count <= 0) { return 0; } //计算页数 long page = count % LogCommConstants.ARCHIVE_PAGE_SIZE == 0 ? count / LogCommConstants.ARCHIVE_PAGE_SIZE : count / LogCommConstants.ARCHIVE_PAGE_SIZE + 1; countDownLatch = new CountDownLatch((int) page); for (int i = 1; i <= page; i++) { Query subQuery = mongodbArchiveService.createQuery(endStamp); subQuery.limit(LogCommConstants.ARCHIVE_PAGE_SIZE); subQuery.skip((long) (i - 1) * pageSize); try { threadPoolExecutor.execute(new archiveLogRunable(subQuery, this, countDownLatch)); } catch (Exception e) { log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e); } } //主线程等待 countDownLatch.await(); log.info("achivedLogHandler:打印日志归档原表数据总数{},实际归档总数{}", count, archiveTotal.get()); if (count == archiveTotal.get()) { //删除原表旧数据 mongoTemplate.remove(query, PrinterLogDoc.class); } else { throw WmsExceptionFactory.getException("achivedLogHandler_error:打印日志归档原表数据和归档数据不一致,threadPoolName:" + getCollectionsName()); } } catch (Exception e) { log.error("achivedLogHandler_error:打印日志归档异常,threadPoolName:" + getCollectionsName(), e); } finally { threadPoolExecutor.shutdown(); } return count; } class archiveLogRunable implements Runnable { private Query query; private ArchivePrinterLogService archivePrinterLogService; private CountDownLatch countDownLatch; public archiveLogRunable(Query query, ArchivePrinterLogService archivePrinterLogService, CountDownLatch countDownLatch) { this.query = query; this.archivePrinterLogService = archivePrinterLogService; this.countDownLatch = countDownLatch; } @Override public void run() { try { List<PrinterLogDoc> list = mongoTemplate.find(query, PrinterLogDoc.class); if (!CollectionUtils.isEmpty(list)) { List<ArchivePrinterLogDoc> archiveList = new ArrayList<ArchivePrinterLogDoc>(list.size()); //入库到归档表中 list.forEach(printerLogDoc -> { ArchivePrinterLogDoc archivePrinterLogDoc = new ArchivePrinterLogDoc(); BeanUtils.copyProperties(printerLogDoc, archivePrinterLogDoc); archivePrinterLogDoc.setExpreDate(new Date()); archiveList.add(archivePrinterLogDoc); }); //批量入库归档表 Collection<ArchivePrinterLogDoc> insertAll = mongoTemplate.insertAll(archiveList); if (!CollectionUtils.isEmpty(insertAll)) { archiveTotal.addAndGet(insertAll.size()); } } } catch (Exception e) { log.error("achivedLogHandler_error:打印日志记录处理批次任务异常", e); } finally { this.query = null; this.countDownLatch.countDown(); } } } }

同样为了提高归档效率,采取了分页查询和线程池并行处理的方式。

注意:我们需要在原始的表中创建一些索引,防止数据量过大分页查询时出现造成查询出来的数据超过mongodb 最大字节数的异常。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #