day6
创建接口:根据材料—+用户提示词来调用deepseek然后获取返回的流式结果中的reasoningContent字段和content字段
package com.fj.report.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fj.report.dto.DeepSeekResponse;
import com.fj.report.entity.SysCaseFile;
import com.fj.report.entity.SysCaseFileParseTask;
import com.fj.report.service.AiServiceWithThinking;
import com.fj.report.service.SysCaseFileParseTaskService;
import com.fj.report.service.SysCaseFileService;
import com.fj.report.vo.ArticleVos;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Slf4j
@Service
public class AiServiceWithThinkingImpl implements AiServiceWithThinking {
private final WebClient webClient;
private final String model;
@Autowired
private SysCaseFileService sysCaseFileService;
@Autowired
private SysCaseFileParseTaskService parseTaskService;
@Autowired
private ObjectMapper objectMapper;
public AiServiceWithThinkingImpl(@Value("${spring.ai.openai.api-key}") String apiKey,
@Value("${spring.ai.openai.base-url}") String baseUrl,
@Value("${spring.ai.openai.chat.options.model}") String model) {
this.webClient = WebClient.builder()
.baseUrl(baseUrl + "/v1") // 确保完整路径
.defaultHeader("Authorization", "Bearer " + apiKey)
.build();
this.model = model;
}
@Override
public Flux<String> generateFromArticlesStream(Long caseId, List<String> articlesFileName, String userPrompt) {
log.info("接收到流式材料生成请求,案件Id: {}, 材料名个数: {}, 提示词: {}", caseId, articlesFileName.size(), userPrompt);
// 1. 根据案件Id以及材料文件名列表获取材料内容
List<ArticleVos> articles = fetchArticlesByCaseIdAndFileNames(caseId, articlesFileName);
// 2. 拼接材料内容到用户提示词中
StringBuilder materialsContent = new StringBuilder();
if (articles != null && !articles.isEmpty()) {
for (ArticleVos article : articles) {
materialsContent.append("## ").append(article.getFileName()).append("\n\n");
materialsContent.append(article.getContent()).append("\n\n");
materialsContent.append("-----").append("\n\n");
}
}
String fullUserPrompt = materialsContent.toString() + "\n" + userPrompt;
log.info("完整用户提示词构建完成,长度: {}", fullUserPrompt.length());
// 3. 构建请求体 - 使用Map+ObjectMapper自动转义特殊字符
Map<String, Object> requestMap = new HashMap<>();
requestMap.put("model", this.model );
Map<String, String> message = new HashMap<>();
message.put("role", "user");
message.put("content", fullUserPrompt); // ObjectMapper会自动转义换行符等特殊字符
requestMap.put("messages", List.of(message));
requestMap.put("stream", true);
Map<String, String> thinking = new HashMap<>();
thinking.put("type", "enabled");
requestMap.put("thinking", thinking);
String requestBody;
try {
requestBody = objectMapper.writeValueAsString(requestMap);
} catch (JsonProcessingException e) {
log.error("序列化请求体失败", e);
return Flux.error(e);
}
log.debug("发送请求到DeepSeek API,请求体长度: {}", requestBody.length());
// 用于跟踪是否已经输出了thinking和content的头部
AtomicBoolean thinkingHeaderSent = new AtomicBoolean(false);
AtomicBoolean contentHeaderSent = new AtomicBoolean(false);
StringBuilder reasoningBuilder = new StringBuilder();
StringBuilder contentBuilder = new StringBuilder();
return webClient.post()
.uri("/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class)
.doOnError(e -> log.error("流处理错误", e))
.flatMap(line -> transformToSSEStream(line, thinkingHeaderSent, contentHeaderSent, reasoningBuilder, contentBuilder))
.doOnComplete(() -> {
log.info("========== 流式响应完成 ==========");
log.info("完整思考内容:\n{}", reasoningBuilder.toString());
log.info("完整生成内容:\n{}", contentBuilder.toString());
log.info("========== 响应完成 ==========");
});
}
/**
* 将流式响应转换为SSE格式
*/
private Flux<String> transformToSSEStream(String rawLine, AtomicBoolean thinkingHeaderSent,
AtomicBoolean contentHeaderSent, StringBuilder reasoningBuilder,
StringBuilder contentBuilder) {
// 1. 过滤空行和结束标志
if (rawLine == null || rawLine.isBlank()) {
log.debug("过滤空行");
return Flux.empty();
}
if ("[DONE]".equals(rawLine.trim())) {
log.debug("收到结束标志");
// 结束thinking部分,开始content部分
if (thinkingHeaderSent.get() && !contentHeaderSent.get()) {
contentHeaderSent.set(true);
return Flux.just(
"-------content-------\n"
);
}
return Flux.empty();
}
String jsonPart = rawLine.trim();
// 2. 解析 JSON 并提取内容
try {
DeepSeekResponse deepSeekResponse = objectMapper.readValue(jsonPart, DeepSeekResponse.class);
if (deepSeekResponse.getChoices() == null || deepSeekResponse.getChoices().isEmpty()) {
log.debug("choices为空");
return Flux.empty();
}
var delta = deepSeekResponse.getChoices().get(0).getDelta();
List<String> result = new ArrayList<>();
// 3. 处理 thinking 内容
if (delta.getReasoningContent() != null && !delta.getReasoningContent().isEmpty()) {
log.debug("提取thinking内容,长度: {}", delta.getReasoningContent().length());
// 如果还没发送thinking头,先发送
if (!thinkingHeaderSent.get()) {
thinkingHeaderSent.set(true);
result.add("-------thinking-------\n");
}
// 直接返回thinking内容
String thinkingContent = delta.getReasoningContent();
result.add(thinkingContent);
reasoningBuilder.append(thinkingContent);
}
// 4. 处理 content 内容
if (delta.getContent() != null && !delta.getContent().isEmpty()) {
log.debug("提取content内容,长度: {}", delta.getContent().length());
// 如果还没发送content头,先发送
if (!contentHeaderSent.get()) {
contentHeaderSent.set(true);
result.add("-------content-------\n");
}
// 直接返回content内容
String contentText = delta.getContent();
result.add(contentText);
contentBuilder.append(contentText);
}
return Flux.fromIterable(result);
} catch (JsonProcessingException e) {
log.warn("解析行失败:{}", jsonPart, e);
return Flux.empty();
}
}
/**
* 根据案件ID和材料文件名列表获取材料内容
*
* @param caseId 案件ID
* @param articlesFileName 文件名列表
* @return 材料内容列表
*/
private List<ArticleVos> fetchArticlesByCaseIdAndFileNames(Long caseId, List<String> articlesFileName) {
log.info("获取案件材料内容: caseId={}, fileNames={}", caseId, articlesFileName);
if (caseId == null || articlesFileName == null || articlesFileName.isEmpty()) {
log.warn("参数为空,返回空列表");
return new ArrayList<>();
}
List<ArticleVos> result = new ArrayList<>();
try {
// 1. 根据案件ID和文件名查询案件文件列表
LambdaQueryWrapper<SysCaseFile> fileWrapper = new LambdaQueryWrapper<>();
fileWrapper.eq(SysCaseFile::getCaseId, caseId)
.in(SysCaseFile::getFileName, articlesFileName);
List<SysCaseFile> caseFiles = sysCaseFileService.list(fileWrapper);
if (caseFiles.isEmpty()) {
log.warn("未找到匹配的案件文件");
return result;
}
// 2. 获取案件文件ID列表
List<Long> caseFileIds = caseFiles.stream()
.map(SysCaseFile::getCaseFileId)
.collect(Collectors.toList());
// 3. 查询解析任务,获取解析后的内容
LambdaQueryWrapper<SysCaseFileParseTask> taskWrapper = new LambdaQueryWrapper<>();
taskWrapper.in(SysCaseFileParseTask::getCaseFileId, caseFileIds)
.eq(SysCaseFileParseTask::getStatus, 2) // 只查询已完成的任务
.orderByDesc(SysCaseFileParseTask::getCreateTime); // 按创建时间倒序,取最新的
List<SysCaseFileParseTask> parseTasks = parseTaskService.list(taskWrapper);
// 4. 构建文件ID到解析任务的映射
Map<Long, SysCaseFileParseTask> caseFileIdToTaskMap = parseTasks.stream()
.collect(Collectors.toMap(
SysCaseFileParseTask::getCaseFileId,
task -> task,
(existing, replacement) -> existing // 如果有重复,保留第一个(最新的)
));
// 5. 组装结果
for (SysCaseFile caseFile : caseFiles) {
SysCaseFileParseTask task = caseFileIdToTaskMap.get(caseFile.getCaseFileId());
if (task != null) {
ArticleVos article = new ArticleVos();
article.setFileName(caseFile.getFileName());
// 判断使用哪种内容格式
String content = null;
String markdown = task.getResultMarkdown();
String text = task.getResultText();
// 优先使用 markdown,但如果包含 http 则使用 text
if (markdown != null && !markdown.isEmpty()) {
if (markdown.contains("http")) {
// markdown 包含 http 链接,使用 text
content = text != null ? text : markdown;
log.debug("文件包含http链接,使用text: fileName={}", caseFile.getFileName());
} else {
// markdown 不包含 http,使用 markdown
content = markdown;
log.debug("使用markdown内容: fileName={}", caseFile.getFileName());
}
} else if (text != null && !text.isEmpty()) {
// markdown 为空,使用 text
content = text;
log.debug("markdown为空,使用text: fileName={}", caseFile.getFileName());
}
if (content != null && !content.isEmpty()) {
article.setContent(content);
result.add(article);
log.debug("添加文件内容: fileName={}, contentLength={}, type={}",
caseFile.getFileName(), content.length(),
(markdown != null && markdown.contains("http")) ? "text" : "markdown");
} else {
log.warn("文件内容为空: fileName={}", caseFile.getFileName());
}
} else {
log.warn("文件未解析或解析失败: fileName={}", caseFile.getFileName());
}
}
log.info("成功获取 {} 个文件的内容", result.size());
} catch (Exception e) {
log.error("获取案件材料内容失败: caseId={}", caseId, e);
}
return result;
}
}