day7
定时任务接口抽取:
原场景:用户上传文件->调用阿里云文档解析->定时任务隔一段时间查询结果
现场景:用户上传文件->调用阿里云文档解析->结束
定时任务平台->调用定时任务接口->查询处理结果->写入数据库
package com.fj.report.controller;
import com.fj.report.service.ScheduleParseTaskService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/schedule-parse-task")
@Slf4j
@Tag(name = "定时解析任务", description = "定时解析任务相关操作API")
public class ScheduleParseTaskController {
@Autowired
private ScheduleParseTaskService scheduleParseTaskService;
/**
* 扫描并处理待处理的解析任务
* 该接口可以被定时任务平台调用
*/
@PostMapping("/scan-and-process")
@Operation(summary = "扫描并处理待处理的解析任务")
public ResponseEntity<Map<String, Object>> scanAndProcessPendingTasks() {
Map<String, Object> result = new HashMap<>();
try {
log.info("开始扫描并处理解析任务");
ScheduleParseTaskService.TaskProcessResult processResult =
scheduleParseTaskService.scanAndProcessPendingTasks();
result.put("success", true);
result.put("message", "任务处理完成");
result.put("processedCount", processResult.getProcessedCount());
result.put("successCount", processResult.getSuccessCount());
result.put("failureCount", processResult.getFailureCount());
log.info("任务处理完成: 总处理数={}, 成功数={}, 失败数={}",
processResult.getProcessedCount(),
processResult.getSuccessCount(),
processResult.getFailureCount());
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("扫描并处理解析任务异常", e);
result.put("success", false);
result.put("message", "任务处理异常: " + e.getMessage());
return ResponseEntity.status(500).body(result);
}
}
}
package com.fj.report.service;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 解析任务调度服务接口
*/
public interface ScheduleParseTaskService {
/**
* 定时扫描并处理待处理的解析任务
* @return 任务处理结果
*/
TaskProcessResult scanAndProcessPendingTasks();
/**
* 任务处理结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
class TaskProcessResult {
/**
* 处理的任务总数
*/
private Integer processedCount;
/**
* 成功的任务数
*/
private Integer successCount;
/**
* 失败的任务数
*/
private Integer failureCount;
}
}
package com.fj.report.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fj.report.entity.SysCaseFileParseTask;
import com.fj.report.service.ScheduleParseTaskService;
import com.fj.report.service.PdfRecognitionService;
import com.fj.report.service.SysCaseFileParseTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import static com.fj.report.common.comstants.CommonConstants.MAX_FILE_PARSE_TIMEOUT_MINUTES;
/**
* 解析任务调度服务实现类
*
* @author fj
*/
@Service
@Slf4j
public class ScheduleParseTaskServiceImpl implements ScheduleParseTaskService {
@Autowired
private SysCaseFileParseTaskService sysCaseFileParseTaskService;
@Autowired
private PdfRecognitionService pdfRecognitionService;
/**
* 扫描并处理待处理的解析任务
*/
@Override
public TaskProcessResult scanAndProcessPendingTasks() {
log.debug("扫描并处理待处理的解析任务...");
int processedCount = 0;
int successCount = 0;
int failureCount = 0;
try {
// 查询状态为 0(待处理) 和 1(处理中) 的任务
LambdaQueryWrapper<SysCaseFileParseTask> wrapper = new LambdaQueryWrapper<>();
wrapper.in(SysCaseFileParseTask::getStatus, 0, 1)
.orderByAsc(SysCaseFileParseTask::getCreateTime);
List<SysCaseFileParseTask> tasks = sysCaseFileParseTaskService.list(wrapper);
if (tasks.isEmpty()) {
log.debug("没有待处理的解析任务。");
return new TaskProcessResult(0, 0, 0);
}
log.info("发现 {} 个待处理的解析任务", tasks.size());
// 逐个处理任务
for (SysCaseFileParseTask task : tasks) {
try {
// 处理单个任务
processTask(task);
processedCount++;
// 检查任务是否成功
if (task.getStatus() == 2) {
successCount++;
} else if (task.getStatus() == 3) {
failureCount++;
}
} catch (Exception e) {
log.error("处理任务异常: taskId={}", task.getId(), e);
failureCount++;
processedCount++;
}
}
log.info("任务扫描完成: 总处理数={}, 成功数={}, 失败数={}",
processedCount, successCount, failureCount);
return new TaskProcessResult(processedCount, successCount, failureCount);
} catch (Exception e) {
log.error("扫描任务异常", e);
throw new RuntimeException("扫描任务异常: " + e.getMessage(), e);
}
}
/**
* 解析单个任务
*
* @param task
*/
private void processTask(SysCaseFileParseTask task) {
log.info("处理解析任务:taskId={},status={}", task.getId(), task.getStatus());
// 检查任务是否超时(超过30分钟)
if (isTaskTimeout(task)) {
sysCaseFileParseTaskService.updateTaskFailed(task.getId(), "任务处理超时");
log.warn("任务超时,更新状态为失败: taskId={}", task.getId());
return;
}
// 查询第三方状态
String statusResult = pdfRecognitionService.queryDocParsingStatus(task.getThirdPlatformTaskId());
if (statusResult == null) {
log.warn("查询第三方任务状态返回为null: taskId={}, thirdTaskId={}",
task.getId(), task.getThirdPlatformTaskId());
return;
}
// 解析状态
JSONObject statusJson = null;
try {
statusJson = JSON.parseObject(statusResult);
} catch (Exception e) {
log.error("解析第三方响应JSON异常: taskId={}, response={}", task.getId(), statusResult, e);
sysCaseFileParseTaskService.updateTaskFailed(task.getId(), "响应解析异常: " + e.getMessage());
return;
}
if (statusJson == null) {
log.warn("状态响应JSON为null: taskId={}", task.getId());
return;
}
JSONObject data = statusJson.getJSONObject("data");
if (data == null) {
log.warn("第三方响应中没有data字段: taskId={}, response={}", task.getId(), statusResult);
return;
}
String status = data.getString("status");
Integer numberOfSuccessfulParsing = data.getInteger("numberOfSuccessfulParsing");
log.info("任务状态:taskId={},status={},numberOfSuccessfulParsing={}",
task.getId(), status, numberOfSuccessfulParsing);
// 根据任务状态进行处理
handleTaskStatus(task, status, numberOfSuccessfulParsing, statusJson, data);
}
/**
* 根据任务状态处理任务
*
* @param task
* @param status
* @param numberOfSuccessfulParsing
* @param statusJson
* @param data
*/
private void handleTaskStatus(SysCaseFileParseTask task, String status, Integer numberOfSuccessfulParsing,
JSONObject statusJson, JSONObject data) {
if (status == null) {
log.warn("任务状态为null: taskId={}", task.getId());
return;
}
if ("success".equalsIgnoreCase(status)) {
// 任务成功,获取解析结果
log.info("任务成功,开始获取结果: taskId={}, numberOfSuccessfulParsing={}",
task.getId(), numberOfSuccessfulParsing);
sysCaseFileParseTaskService.fetchAndSaveResults(task.getId(), numberOfSuccessfulParsing);
} else if ("fail".equalsIgnoreCase(status)) {
// 任务失败,更新状态
String failCode = data.getString("code");
String failMessage = data.getString("message");
if (failMessage == null) {
failMessage = statusJson.getString("Message");
}
if (failCode == null) {
failCode = statusJson.getString("Code");
}
log.error("任务失败: taskId={}, failCode={}, failMessage={}", task.getId(), failCode, failMessage);
sysCaseFileParseTaskService.updateTaskFailed(task.getId(), failCode, failMessage);
} else if ("processing".equalsIgnoreCase(status)) {
// 任务处理中,更新状态
if (task.getStatus() != 1) {
log.debug("任务处理中: taskId={}", task.getId());
sysCaseFileParseTaskService.updateTaskProcessing(task.getId());
}
} else if ("init".equalsIgnoreCase(status)) {
// 任务初始化,保持待处理状态
log.debug("任务处于初始化状态,保持待处理状态:taskId={}", task.getId());
} else {
log.warn("未知的任务状态: taskId={}, status={}", task.getId(), status);
}
}
/**
* 检查任务是否超时
*
* @param task
* @return
*/
private boolean isTaskTimeout(SysCaseFileParseTask task) {
LocalDateTime startTime = task.getProcessingStartTime();
if (startTime == null) {
startTime = task.getCreateTime();
}
LocalDateTime now = LocalDateTime.now();
long minutes = java.time.Duration.between(startTime, now).toMinutes();
return minutes > MAX_FILE_PARSE_TIMEOUT_MINUTES; // 超过30分钟视为超时
}
}
java解压zip文件
对应需求:用户上传zip压缩包,自动解析zip中的文件,上传到OSS
导入POM文件:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.28.0</version>
</dependency>
需要注意问题:编码问题、OSS上传接口为MultiPartFile接口,需要自定义一个接口将字节流转换为需要的格式
test测试代码:
字节流到MultiPartFile:
package com.fj.report.common.utils;
import org.springframework.util.FileCopyUtils;
import org.springframework.web.multipart.MultipartFile;
import javax.validation.constraints.NotNull;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
public class ZipFileUtils implements MultipartFile {
private final String name;
private final String originalFilename;
private final String contentType;
private final byte[] content;
public ZipFileUtils(String name, String originalFilename, String contentType, byte[] content) {
this.name = name;
this.originalFilename = originalFilename;
this.contentType = contentType != null ? contentType : "application/octet-stream";
this.content = content;
}
@Override
@NotNull
public String getName() {
return this.name;
}
@Override
public String getOriginalFilename() {
return this.originalFilename;
}
@Override
public String getContentType() {
return this.contentType;
}
@Override
public boolean isEmpty() {
return this.content.length == 0;
}
@Override
public long getSize() {
return this.content.length;
}
@Override
@NotNull
public byte[] getBytes() throws IOException {
return this.content;
}
@Override
@NotNull
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(this.content);
}
@Override
public void transferTo(@NotNull File dest) throws IOException, IllegalStateException {
FileCopyUtils.copy(this.content, dest);
}
}
zip上传的controller(包含字符编码问题解决方法):
package com.fj.report.controller;
import com.fj.report.common.utils.ZipFileUtils;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/test-zip")
public class TestZipController {
@Autowired
private TestFileUploadController testFileUploadController;
@PostMapping(value = "/test", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public String test(
@Parameter(
description = "压缩文件",
content = @Content(mediaType = MediaType.APPLICATION_OCTET_STREAM_VALUE)
)
@RequestParam("file") MultipartFile file) {
List<MultipartFile> files;
try {
files = unzipToMultipartFiles(file, "UTF-8");
} catch (IOException e) {
try{
files = unzipToMultipartFiles(file, "GBK");
} catch (IOException ex) {
return "解压失败";
}
}
for(MultipartFile f : files){
System.out.println("文件名: " + f.getOriginalFilename() + ", 大小: " + f.getSize() + " 字节");
testFileUploadController.test(f);
}
return "解压成功,文件数量: " + files.size();
}
private List<MultipartFile> unzipToMultipartFiles(MultipartFile file, String encoding) throws IOException {
List<MultipartFile> list = new ArrayList<>();
try (InputStream is = file.getInputStream();
ZipArchiveInputStream zis = new ZipArchiveInputStream(is, encoding, false, true)){
ZipArchiveEntry entry;
while((entry = zis.getNextEntry()) != null){
if(entry.isDirectory() || entry.getName().startsWith("__MACOSX") || entry.getName().contains(".DS_Store")){
continue;
}
// 将当前条目的数据全部读取到内存字节数组中
byte[] bytes = IOUtils.toByteArray(zis);
// 封装成 MultipartFile 对象
MultipartFile multipartFile = new ZipFileUtils(
"file",
entry.getName(),
"application/octet-stream",
bytes
);
list.add(multipartFile);
}
return list;
}
}
}
调用模拟的OSS文件上传接口:
package com.fj.report.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@Slf4j
@RestController
@RequestMapping("/test-file-upload")
@Tag(name = "测试文件上传", description = "测试文件上传相关操作API")
public class TestFileUploadController {
@PostMapping("/test")
@Operation(summary = "测试文件上传接口")
public void test(
@RequestParam MultipartFile file
){
log.info("收到文件: 名称 = {}, 大小 = {} 字节", file.getOriginalFilename(), file.getSize());
}
}