day6

#实习

创建接口:根据材料—+用户提示词来调用deepseek然后获取返回的流式结果中的reasoningContent字段和content字段

package com.fj.report.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fj.report.dto.DeepSeekResponse;
import com.fj.report.entity.SysCaseFile;
import com.fj.report.entity.SysCaseFileParseTask;
import com.fj.report.service.AiServiceWithThinking;
import com.fj.report.service.SysCaseFileParseTaskService;
import com.fj.report.service.SysCaseFileService;
import com.fj.report.vo.ArticleVos;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;


@Slf4j
@Service
public class AiServiceWithThinkingImpl implements AiServiceWithThinking {

    private final WebClient webClient;
    private final String model;
    @Autowired
    private SysCaseFileService sysCaseFileService;
    @Autowired
    private SysCaseFileParseTaskService parseTaskService;
    @Autowired
    private ObjectMapper objectMapper;

    public AiServiceWithThinkingImpl(@Value("${spring.ai.openai.api-key}") String apiKey,
                                     @Value("${spring.ai.openai.base-url}") String baseUrl,
                                     @Value("${spring.ai.openai.chat.options.model}") String model) {
        this.webClient = WebClient.builder()
                .baseUrl(baseUrl + "/v1")  // 确保完整路径
                .defaultHeader("Authorization", "Bearer " + apiKey)
                .build();
        this.model = model;
    }

    @Override
    public Flux<String> generateFromArticlesStream(Long caseId, List<String> articlesFileName, String userPrompt) {
        log.info("接收到流式材料生成请求,案件Id: {}, 材料名个数: {}, 提示词: {}", caseId, articlesFileName.size(), userPrompt);

        // 1. 根据案件Id以及材料文件名列表获取材料内容
        List<ArticleVos> articles = fetchArticlesByCaseIdAndFileNames(caseId, articlesFileName);

        // 2. 拼接材料内容到用户提示词中
        StringBuilder materialsContent = new StringBuilder();
        if (articles != null && !articles.isEmpty()) {
            for (ArticleVos article : articles) {
                materialsContent.append("## ").append(article.getFileName()).append("\n\n");
                materialsContent.append(article.getContent()).append("\n\n");
                materialsContent.append("-----").append("\n\n");
            }
        }
        String fullUserPrompt = materialsContent.toString() + "\n" + userPrompt;
        log.info("完整用户提示词构建完成,长度: {}", fullUserPrompt.length());

        // 3. 构建请求体 - 使用Map+ObjectMapper自动转义特殊字符
        Map<String, Object> requestMap = new HashMap<>();
        requestMap.put("model", this.model );

        Map<String, String> message = new HashMap<>();
        message.put("role", "user");
        message.put("content", fullUserPrompt);  // ObjectMapper会自动转义换行符等特殊字符
        requestMap.put("messages", List.of(message));
        requestMap.put("stream", true);

        Map<String, String> thinking = new HashMap<>();
        thinking.put("type", "enabled");
        requestMap.put("thinking", thinking);

        String requestBody;
        try {
            requestBody = objectMapper.writeValueAsString(requestMap);
        } catch (JsonProcessingException e) {
            log.error("序列化请求体失败", e);
            return Flux.error(e);
        }
        log.debug("发送请求到DeepSeek API,请求体长度: {}", requestBody.length());

        // 用于跟踪是否已经输出了thinking和content的头部
        AtomicBoolean thinkingHeaderSent = new AtomicBoolean(false);
        AtomicBoolean contentHeaderSent = new AtomicBoolean(false);
        StringBuilder reasoningBuilder = new StringBuilder();
        StringBuilder contentBuilder = new StringBuilder();

        return  webClient.post()
                .uri("/chat/completions")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(requestBody)
                .retrieve()
                .bodyToFlux(String.class)
                .doOnError(e -> log.error("流处理错误", e))
                .flatMap(line -> transformToSSEStream(line, thinkingHeaderSent, contentHeaderSent, reasoningBuilder, contentBuilder))
                .doOnComplete(() -> {
                    log.info("========== 流式响应完成 ==========");
                    log.info("完整思考内容:\n{}", reasoningBuilder.toString());
                    log.info("完整生成内容:\n{}", contentBuilder.toString());
                    log.info("========== 响应完成 ==========");
                });


    }

    /**
     * 将流式响应转换为SSE格式
     */
    private Flux<String> transformToSSEStream(String rawLine, AtomicBoolean thinkingHeaderSent,
                                              AtomicBoolean contentHeaderSent, StringBuilder reasoningBuilder,
                                              StringBuilder contentBuilder) {

        // 1. 过滤空行和结束标志
        if (rawLine == null || rawLine.isBlank()) {
            log.debug("过滤空行");
            return Flux.empty();
        }

        if ("[DONE]".equals(rawLine.trim())) {
            log.debug("收到结束标志");
            // 结束thinking部分,开始content部分
            if (thinkingHeaderSent.get() && !contentHeaderSent.get()) {
                contentHeaderSent.set(true);
                return Flux.just(
                        "-------content-------\n"
                );
            }
            return Flux.empty();
        }

        String jsonPart = rawLine.trim();

        // 2. 解析 JSON 并提取内容
        try {
            DeepSeekResponse deepSeekResponse = objectMapper.readValue(jsonPart, DeepSeekResponse.class);
            if (deepSeekResponse.getChoices() == null || deepSeekResponse.getChoices().isEmpty()) {
                log.debug("choices为空");
                return Flux.empty();
            }

            var delta = deepSeekResponse.getChoices().get(0).getDelta();
            List<String> result = new ArrayList<>();

            // 3. 处理 thinking 内容
            if (delta.getReasoningContent() != null && !delta.getReasoningContent().isEmpty()) {
                log.debug("提取thinking内容,长度: {}", delta.getReasoningContent().length());

                // 如果还没发送thinking头,先发送
                if (!thinkingHeaderSent.get()) {
                    thinkingHeaderSent.set(true);
                    result.add("-------thinking-------\n");
                }

                // 直接返回thinking内容
                String thinkingContent = delta.getReasoningContent();
                result.add(thinkingContent);
                reasoningBuilder.append(thinkingContent);
            }

            // 4. 处理 content 内容
            if (delta.getContent() != null && !delta.getContent().isEmpty()) {
                log.debug("提取content内容,长度: {}", delta.getContent().length());

                // 如果还没发送content头,先发送
                if (!contentHeaderSent.get()) {
                    contentHeaderSent.set(true);
                    result.add("-------content-------\n");
                }

                // 直接返回content内容
                String contentText = delta.getContent();
                result.add(contentText);
                contentBuilder.append(contentText);
            }

            return Flux.fromIterable(result);

        } catch (JsonProcessingException e) {
            log.warn("解析行失败:{}", jsonPart, e);
            return Flux.empty();
        }
    }

    /**
     * 根据案件ID和材料文件名列表获取材料内容
     *
     * @param caseId           案件ID
     * @param articlesFileName 文件名列表
     * @return 材料内容列表
     */
    private List<ArticleVos> fetchArticlesByCaseIdAndFileNames(Long caseId, List<String> articlesFileName) {
        log.info("获取案件材料内容: caseId={}, fileNames={}", caseId, articlesFileName);

        if (caseId == null || articlesFileName == null || articlesFileName.isEmpty()) {
            log.warn("参数为空,返回空列表");
            return new ArrayList<>();
        }

        List<ArticleVos> result = new ArrayList<>();

        try {
            // 1. 根据案件ID和文件名查询案件文件列表
            LambdaQueryWrapper<SysCaseFile> fileWrapper = new LambdaQueryWrapper<>();
            fileWrapper.eq(SysCaseFile::getCaseId, caseId)
                    .in(SysCaseFile::getFileName, articlesFileName);

            List<SysCaseFile> caseFiles = sysCaseFileService.list(fileWrapper);

            if (caseFiles.isEmpty()) {
                log.warn("未找到匹配的案件文件");
                return result;
            }

            // 2. 获取案件文件ID列表
            List<Long> caseFileIds = caseFiles.stream()
                    .map(SysCaseFile::getCaseFileId)
                    .collect(Collectors.toList());

            // 3. 查询解析任务,获取解析后的内容
            LambdaQueryWrapper<SysCaseFileParseTask> taskWrapper = new LambdaQueryWrapper<>();
            taskWrapper.in(SysCaseFileParseTask::getCaseFileId, caseFileIds)
                    .eq(SysCaseFileParseTask::getStatus, 2) // 只查询已完成的任务
                    .orderByDesc(SysCaseFileParseTask::getCreateTime); // 按创建时间倒序,取最新的

            List<SysCaseFileParseTask> parseTasks = parseTaskService.list(taskWrapper);
            // 4. 构建文件ID到解析任务的映射
            Map<Long, SysCaseFileParseTask> caseFileIdToTaskMap = parseTasks.stream()
                    .collect(Collectors.toMap(
                            SysCaseFileParseTask::getCaseFileId,
                            task -> task,
                            (existing, replacement) -> existing // 如果有重复,保留第一个(最新的)
                    ));

            // 5. 组装结果
            for (SysCaseFile caseFile : caseFiles) {
                SysCaseFileParseTask task = caseFileIdToTaskMap.get(caseFile.getCaseFileId());

                if (task != null) {
                    ArticleVos article = new ArticleVos();
                    article.setFileName(caseFile.getFileName());

                    // 判断使用哪种内容格式
                    String content = null;
                    String markdown = task.getResultMarkdown();
                    String text = task.getResultText();

                    // 优先使用 markdown,但如果包含 http 则使用 text
                    if (markdown != null && !markdown.isEmpty()) {
                        if (markdown.contains("http")) {
                            // markdown 包含 http 链接,使用 text
                            content = text != null ? text : markdown;
                            log.debug("文件包含http链接,使用text: fileName={}", caseFile.getFileName());
                        } else {
                            // markdown 不包含 http,使用 markdown
                            content = markdown;
                            log.debug("使用markdown内容: fileName={}", caseFile.getFileName());
                        }
                    } else if (text != null && !text.isEmpty()) {
                        // markdown 为空,使用 text
                        content = text;
                        log.debug("markdown为空,使用text: fileName={}", caseFile.getFileName());
                    }

                    if (content != null && !content.isEmpty()) {
                        article.setContent(content);
                        result.add(article);

                        log.debug("添加文件内容: fileName={}, contentLength={}, type={}",
                                caseFile.getFileName(), content.length(),
                                (markdown != null && markdown.contains("http")) ? "text" : "markdown");
                    } else {
                        log.warn("文件内容为空: fileName={}", caseFile.getFileName());
                    }
                } else {
                    log.warn("文件未解析或解析失败: fileName={}", caseFile.getFileName());
                }
            }

            log.info("成功获取 {} 个文件的内容", result.size());

        } catch (Exception e) {
            log.error("获取案件材料内容失败: caseId={}", caseId, e);
        }

        return result;
    }
}
← 返回首页