day7

#实习

定时任务接口抽取:

原场景:用户上传文件->调用阿里云文档解析->定时任务隔一段时间查询结果

现场景:用户上传文件->调用阿里云文档解析->结束

​ 定时任务平台->调用定时任务接口->查询处理结果->写入数据库

package com.fj.report.controller;

import com.fj.report.service.ScheduleParseTaskService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/schedule-parse-task")
@Slf4j
@Tag(name = "定时解析任务", description = "定时解析任务相关操作API")
public class ScheduleParseTaskController {
    @Autowired
    private ScheduleParseTaskService scheduleParseTaskService;

    /**
     * 扫描并处理待处理的解析任务
     * 该接口可以被定时任务平台调用
     */
    @PostMapping("/scan-and-process")
    @Operation(summary = "扫描并处理待处理的解析任务")
    public ResponseEntity<Map<String, Object>> scanAndProcessPendingTasks() {
        Map<String, Object> result = new HashMap<>();
        try {
            log.info("开始扫描并处理解析任务");
            
            ScheduleParseTaskService.TaskProcessResult processResult = 
                    scheduleParseTaskService.scanAndProcessPendingTasks();
            
            result.put("success", true);
            result.put("message", "任务处理完成");
            result.put("processedCount", processResult.getProcessedCount());
            result.put("successCount", processResult.getSuccessCount());
            result.put("failureCount", processResult.getFailureCount());
            
            log.info("任务处理完成: 总处理数={}, 成功数={}, 失败数={}",
                    processResult.getProcessedCount(),
                    processResult.getSuccessCount(),
                    processResult.getFailureCount());
            
            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("扫描并处理解析任务异常", e);
            result.put("success", false);
            result.put("message", "任务处理异常: " + e.getMessage());
            return ResponseEntity.status(500).body(result);
        }
    }
}
package com.fj.report.service;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 解析任务调度服务接口
 */
public interface ScheduleParseTaskService {

    /**
     * 定时扫描并处理待处理的解析任务
     * @return 任务处理结果
     */
    TaskProcessResult scanAndProcessPendingTasks();

    /**
     * 任务处理结果
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    class TaskProcessResult {
        /**
         * 处理的任务总数
         */
        private Integer processedCount;
        /**
         * 成功的任务数
         */
        private Integer successCount;
        /**
         * 失败的任务数
         */
        private Integer failureCount;
    }
}
package com.fj.report.service.impl;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fj.report.entity.SysCaseFileParseTask;
import com.fj.report.service.ScheduleParseTaskService;
import com.fj.report.service.PdfRecognitionService;
import com.fj.report.service.SysCaseFileParseTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;

import static com.fj.report.common.comstants.CommonConstants.MAX_FILE_PARSE_TIMEOUT_MINUTES;

/**
 * 解析任务调度服务实现类
 *
 * @author fj
 */
@Service
@Slf4j
public class ScheduleParseTaskServiceImpl implements ScheduleParseTaskService {
    @Autowired
    private SysCaseFileParseTaskService sysCaseFileParseTaskService;
    @Autowired
    private PdfRecognitionService pdfRecognitionService;

    /**
     * 扫描并处理待处理的解析任务
     */
    @Override
    public TaskProcessResult scanAndProcessPendingTasks() {
        log.debug("扫描并处理待处理的解析任务...");

        int processedCount = 0;
        int successCount = 0;
        int failureCount = 0;

        try {
            // 查询状态为 0(待处理) 和 1(处理中) 的任务
            LambdaQueryWrapper<SysCaseFileParseTask> wrapper = new LambdaQueryWrapper<>();
            wrapper.in(SysCaseFileParseTask::getStatus, 0, 1)
                    .orderByAsc(SysCaseFileParseTask::getCreateTime);

            List<SysCaseFileParseTask> tasks = sysCaseFileParseTaskService.list(wrapper);

            if (tasks.isEmpty()) {
                log.debug("没有待处理的解析任务。");
                return new TaskProcessResult(0, 0, 0);
            }

            log.info("发现 {} 个待处理的解析任务", tasks.size());

            // 逐个处理任务
            for (SysCaseFileParseTask task : tasks) {
                try {
                    // 处理单个任务
                    processTask(task);
                    processedCount++;
                    
                    // 检查任务是否成功
                    if (task.getStatus() == 2) {
                        successCount++;
                    } else if (task.getStatus() == 3) {
                        failureCount++;
                    }
                } catch (Exception e) {
                    log.error("处理任务异常: taskId={}", task.getId(), e);
                    failureCount++;
                    processedCount++;
                }
            }

            log.info("任务扫描完成: 总处理数={}, 成功数={}, 失败数={}",
                    processedCount, successCount, failureCount);

            return new TaskProcessResult(processedCount, successCount, failureCount);
        } catch (Exception e) {
            log.error("扫描任务异常", e);
            throw new RuntimeException("扫描任务异常: " + e.getMessage(), e);
        }
    }

    /**
     * 解析单个任务
     *
     * @param task
     */
    private void processTask(SysCaseFileParseTask task) {
        log.info("处理解析任务:taskId={},status={}", task.getId(), task.getStatus());

        // 检查任务是否超时(超过30分钟)
        if (isTaskTimeout(task)) {
            sysCaseFileParseTaskService.updateTaskFailed(task.getId(), "任务处理超时");
            log.warn("任务超时,更新状态为失败: taskId={}", task.getId());
            return;
        }

        // 查询第三方状态
        String statusResult = pdfRecognitionService.queryDocParsingStatus(task.getThirdPlatformTaskId());

        if (statusResult == null) {
            log.warn("查询第三方任务状态返回为null: taskId={}, thirdTaskId={}",
                    task.getId(), task.getThirdPlatformTaskId());
            return;
        }

        // 解析状态
        JSONObject statusJson = null;
        try {
            statusJson = JSON.parseObject(statusResult);
        } catch (Exception e) {
            log.error("解析第三方响应JSON异常: taskId={}, response={}", task.getId(), statusResult, e);
            sysCaseFileParseTaskService.updateTaskFailed(task.getId(), "响应解析异常: " + e.getMessage());
            return;
        }

        if (statusJson == null) {
            log.warn("状态响应JSON为null: taskId={}", task.getId());
            return;
        }

        JSONObject data = statusJson.getJSONObject("data");
        if (data == null) {
            log.warn("第三方响应中没有data字段: taskId={}, response={}", task.getId(), statusResult);
            return;
        }

        String status = data.getString("status");
        Integer numberOfSuccessfulParsing = data.getInteger("numberOfSuccessfulParsing");

        log.info("任务状态:taskId={},status={},numberOfSuccessfulParsing={}",
                task.getId(), status, numberOfSuccessfulParsing);

        // 根据任务状态进行处理
        handleTaskStatus(task, status, numberOfSuccessfulParsing, statusJson, data);
    }

    /**
     * 根据任务状态处理任务
     *
     * @param task
     * @param status
     * @param numberOfSuccessfulParsing
     * @param statusJson
     * @param data
     */
    private void handleTaskStatus(SysCaseFileParseTask task, String status, Integer numberOfSuccessfulParsing,
                                  JSONObject statusJson, JSONObject data) {
        if (status == null) {
            log.warn("任务状态为null: taskId={}", task.getId());
            return;
        }

        if ("success".equalsIgnoreCase(status)) {
            // 任务成功,获取解析结果
            log.info("任务成功,开始获取结果: taskId={}, numberOfSuccessfulParsing={}",
                    task.getId(), numberOfSuccessfulParsing);
            sysCaseFileParseTaskService.fetchAndSaveResults(task.getId(), numberOfSuccessfulParsing);
        } else if ("fail".equalsIgnoreCase(status)) {
            // 任务失败,更新状态
            String failCode = data.getString("code");
            String failMessage = data.getString("message");
            if (failMessage == null) {
                failMessage = statusJson.getString("Message");
            }
            if (failCode == null) {
                failCode = statusJson.getString("Code");
            }

            log.error("任务失败: taskId={}, failCode={}, failMessage={}", task.getId(), failCode, failMessage);
            sysCaseFileParseTaskService.updateTaskFailed(task.getId(), failCode, failMessage);
        } else if ("processing".equalsIgnoreCase(status)) {
            // 任务处理中,更新状态
            if (task.getStatus() != 1) {
                log.debug("任务处理中: taskId={}", task.getId());
                sysCaseFileParseTaskService.updateTaskProcessing(task.getId());
            }
        } else if ("init".equalsIgnoreCase(status)) {
            // 任务初始化,保持待处理状态
            log.debug("任务处于初始化状态,保持待处理状态:taskId={}", task.getId());
        } else {
            log.warn("未知的任务状态: taskId={}, status={}", task.getId(), status);
        }
    }

    /**
     * 检查任务是否超时
     *
     * @param task
     * @return
     */
    private boolean isTaskTimeout(SysCaseFileParseTask task) {
        LocalDateTime startTime = task.getProcessingStartTime();
        if (startTime == null) {
            startTime = task.getCreateTime();
        }

        LocalDateTime now = LocalDateTime.now();
        long minutes = java.time.Duration.between(startTime, now).toMinutes();

        return minutes > MAX_FILE_PARSE_TIMEOUT_MINUTES; // 超过30分钟视为超时
    }
}

java解压zip文件

对应需求:用户上传zip压缩包,自动解析zip中的文件,上传到OSS

导入POM文件:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-compress</artifactId>
    <version>1.28.0</version>
</dependency>

需要注意问题:编码问题、OSS上传接口为MultiPartFile接口,需要自定义一个接口将字节流转换为需要的格式

test测试代码:

字节流到MultiPartFile:

package com.fj.report.common.utils;

import org.springframework.util.FileCopyUtils;
import org.springframework.web.multipart.MultipartFile;

import javax.validation.constraints.NotNull;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

public class ZipFileUtils implements MultipartFile {
    private final String name;
    private final String originalFilename;
    private final String contentType;
    private final byte[] content;

    public ZipFileUtils(String name, String originalFilename, String contentType, byte[] content) {
        this.name = name;
        this.originalFilename = originalFilename;
        this.contentType = contentType != null ? contentType : "application/octet-stream";
        this.content = content;
    }

    @Override
    @NotNull
    public String getName() {
        return this.name;
    }

    @Override
    public String getOriginalFilename() {
        return this.originalFilename;
    }

    @Override
    public String getContentType() {
        return this.contentType;
    }

    @Override
    public boolean isEmpty() {
        return this.content.length == 0;
    }

    @Override
    public long getSize() {
        return this.content.length;
    }

    @Override
    @NotNull
    public byte[] getBytes() throws IOException {
        return this.content;
    }

    @Override
    @NotNull
    public InputStream getInputStream() throws IOException {
        return new ByteArrayInputStream(this.content);
    }

    @Override
    public void transferTo(@NotNull File dest) throws IOException, IllegalStateException {
        FileCopyUtils.copy(this.content, dest);
    }
}

zip上传的controller(包含字符编码问题解决方法):

package com.fj.report.controller;

import com.fj.report.common.utils.ZipFileUtils;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/test-zip")
public class TestZipController {
    @Autowired
    private TestFileUploadController testFileUploadController;

    @PostMapping(value = "/test", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public String test(
            @Parameter(
                    description = "压缩文件",
                    content = @Content(mediaType = MediaType.APPLICATION_OCTET_STREAM_VALUE)
            )
            @RequestParam("file") MultipartFile file) {
        List<MultipartFile> files;
        try {
            files = unzipToMultipartFiles(file, "UTF-8");
        } catch (IOException e) {
            try{
                files = unzipToMultipartFiles(file, "GBK");
            } catch (IOException ex) {
                return "解压失败";
            }
        }

        for(MultipartFile f : files){
            System.out.println("文件名: " + f.getOriginalFilename() + ", 大小: " + f.getSize() + " 字节");
            testFileUploadController.test(f);
        }
        return "解压成功,文件数量: " + files.size();
    }

    private List<MultipartFile> unzipToMultipartFiles(MultipartFile file, String encoding) throws IOException {
        List<MultipartFile> list = new ArrayList<>();
        try (InputStream is = file.getInputStream();
             ZipArchiveInputStream zis = new ZipArchiveInputStream(is, encoding, false, true)){
            ZipArchiveEntry entry;
            while((entry = zis.getNextEntry()) != null){
                if(entry.isDirectory() || entry.getName().startsWith("__MACOSX") || entry.getName().contains(".DS_Store")){
                    continue;
                }

                // 将当前条目的数据全部读取到内存字节数组中
                byte[] bytes = IOUtils.toByteArray(zis);

                // 封装成 MultipartFile 对象
                MultipartFile multipartFile = new ZipFileUtils(
                        "file",
                        entry.getName(),
                        "application/octet-stream",
                        bytes
                );
                list.add(multipartFile);
            }
            return list;
        }
    }
}

调用模拟的OSS文件上传接口:

package com.fj.report.controller;


import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

@Slf4j
@RestController
@RequestMapping("/test-file-upload")
@Tag(name = "测试文件上传", description = "测试文件上传相关操作API")
public class TestFileUploadController {

    @PostMapping("/test")
    @Operation(summary = "测试文件上传接口")
    public void test(
            @RequestParam MultipartFile file
            ){
        log.info("收到文件: 名称 = {}, 大小 = {} 字节", file.getOriginalFilename(), file.getSize());
    }
}
← 返回首页