|
@@ -0,0 +1,198 @@
|
|
|
+package org.eco.system.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import jakarta.annotation.Resource;
|
|
|
+import org.eco.common.orm.core.service.impl.BaseServiceImpl;
|
|
|
+import org.eco.common.security.utils.LoginHelper;
|
|
|
+import org.eco.common.websocket.utils.WebSocketUtils;
|
|
|
+import org.eco.system.domain.po.VideoPo;
|
|
|
+import org.eco.system.domain.vo.VideoVO;
|
|
|
+import org.eco.system.mapper.TaskMapper;
|
|
|
+import org.eco.system.mapper.VideoMapper;
|
|
|
+import org.eco.system.service.IVideofile;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Propagation;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.net.HttpURLConnection;
|
|
|
+import java.net.URL;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class VideofileServiceImpl extends BaseServiceImpl<VideoMapper, VideoVO> implements IVideofile, org.springframework.beans.factory.DisposableBean {
|
|
|
+
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(VideofileServiceImpl.class);
|
|
|
+
|
|
|
+ private final ExecutorService executorService;
|
|
|
+
|
|
|
+ @Value("${model.httpurl}")
|
|
|
+ private String modelPath;
|
|
|
+
|
|
|
+ @Value("${video.thread.pool.size:5}")
|
|
|
+ private int threadPoolSize;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TaskMapper taskMapper;
|
|
|
+
|
|
|
+
|
|
|
+ public VideofileServiceImpl(@Value("${video.thread.pool.size:5}") int threadPoolSize) {
|
|
|
+ this.executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建视频生成任务
|
|
|
+ * @param vo
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public VideoPo createVideo(VideoVO vo) throws IOException {
|
|
|
+ VideoPo po = new VideoPo();
|
|
|
+ po.setUserId(LoginHelper.getUserId().toString());
|
|
|
+ po.setVideoName(vo.getVideoName());
|
|
|
+ po.setStatus("pending");
|
|
|
+ int insert = taskMapper.insert(po);
|
|
|
+
|
|
|
+ // 准备API请求参数
|
|
|
+ Map<String, Object> httpMap = prepareHttpParams(vo);
|
|
|
+ String jsonRequest = JSON.toJSONString(httpMap);
|
|
|
+ executorService.submit(() -> processVideoGeneration(po, jsonRequest));
|
|
|
+ return po;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Object> prepareHttpParams(VideoVO vo) {
|
|
|
+ Map<String, Object> httpMap = new HashMap<>();
|
|
|
+ if("text".equals(vo.getCurrent())){
|
|
|
+ httpMap.put("persona_template", vo.getImagefile());
|
|
|
+ httpMap.put("audio_text", vo.getTxtCount());
|
|
|
+ httpMap.put("voice_type", vo.getTimbre());
|
|
|
+ } else {
|
|
|
+ httpMap.put("persona_template", vo.getImagefile());
|
|
|
+ httpMap.put("persona_audio", vo.getAudioFile());
|
|
|
+ }
|
|
|
+ return httpMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional
|
|
|
+ protected void processVideoGeneration(VideoPo po, String jsonRequest) {
|
|
|
+ try {
|
|
|
+ Map<String, Object> responseMap = sendPostRequest(modelPath, jsonRequest);
|
|
|
+
|
|
|
+ if (responseMap != null && responseMap.containsKey("url")) {
|
|
|
+ po.setVideoUrl(responseMap.get("url").toString());
|
|
|
+ po.setStatus("completed");
|
|
|
+
|
|
|
+ // 延迟15秒后执行数据库更新
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(15000); // 15秒延时
|
|
|
+
|
|
|
+ // 使用新的事务执行更新操作
|
|
|
+ updateVideoStatusInNewTransaction(po);
|
|
|
+
|
|
|
+ // 通过WebSocket发送通知
|
|
|
+ WebSocketUtils.sendMessage(Long.valueOf(po.getId()), "视频生成完成");
|
|
|
+
|
|
|
+ logger.info("视频生成任务完成,ID: {}", po.getId());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ logger.error("延时被中断", e);
|
|
|
+ handleFailedTask(po, "系统异常: 任务延时被中断");
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("视频生成任务处理失败", e);
|
|
|
+ handleFailedTask(po, "系统异常: " + e.getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ handleFailedTask(po, "API响应格式异常");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("视频生成任务处理失败", e);
|
|
|
+ handleFailedTask(po, "系统异常: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 使用@Transactional注解创建新事务执行更新
|
|
|
+ @Transactional(propagation = Propagation.REQUIRES_NEW)
|
|
|
+ protected void updateVideoStatusInNewTransaction(VideoPo po) {
|
|
|
+ taskMapper.update(po);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleFailedTask(VideoPo po, String errorMsg) {
|
|
|
+ po.setStatus("failed");
|
|
|
+ po.setErrorMsg(errorMsg);
|
|
|
+ taskMapper.update(po);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送POST请求
|
|
|
+ */
|
|
|
+ public static Map<String, Object> sendPostRequest(String apiUrl, String jsonBody) throws IOException {
|
|
|
+ URL url = new URL(apiUrl);
|
|
|
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
|
|
+ connection.setRequestMethod("POST");
|
|
|
+ connection.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
|
|
|
+ connection.setDoOutput(true);
|
|
|
+
|
|
|
+ byte[] requestBody = jsonBody.getBytes(StandardCharsets.UTF_8);
|
|
|
+ connection.setRequestProperty("Content-Length", String.valueOf(requestBody.length));
|
|
|
+
|
|
|
+ try (DataOutputStream outputStream = new DataOutputStream(connection.getOutputStream())) {
|
|
|
+ outputStream.write(requestBody);
|
|
|
+ outputStream.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ int responseCode = connection.getResponseCode();
|
|
|
+ StringBuilder response = new StringBuilder();
|
|
|
+ try (BufferedReader reader = new BufferedReader(
|
|
|
+ new InputStreamReader(
|
|
|
+ responseCode >= 200 && responseCode < 300 ?
|
|
|
+ connection.getInputStream() : connection.getErrorStream(),
|
|
|
+ StandardCharsets.UTF_8
|
|
|
+ )
|
|
|
+ )) {
|
|
|
+ String line;
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ response.append(line);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 记录完整响应信息
|
|
|
+ logger.info("API 请求响应: 状态码={}, 响应数据={}", responseCode, response);
|
|
|
+
|
|
|
+ if (responseCode >= 200 && responseCode < 300) {
|
|
|
+ ObjectMapper objectMapper = new ObjectMapper();
|
|
|
+ return objectMapper.readValue(response.toString(), Map.class);
|
|
|
+ } else {
|
|
|
+ logger.error("API请求失败,状态码: {}", responseCode);
|
|
|
+ throw new IOException("API请求失败,状态码: " + responseCode);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 优雅关闭线程池
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void destroy() throws Exception {
|
|
|
+ executorService.shutdown();
|
|
|
+ try {
|
|
|
+ if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
|
|
|
+ executorService.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ executorService.shutdownNow();
|
|
|
+ }
|
|
|
+ logger.info("视频处理线程池已关闭");
|
|
|
+ }
|
|
|
+}
|