|
@@ -1,12 +1,33 @@
|
|
|
package com.pdaaphm.system.service.impl;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+
|
|
|
+import com.pdaaphm.biz.domain.BaseResponse;
|
|
|
+import com.pdaaphm.common.config.PadaphmConfig;
|
|
|
import com.pdaaphm.common.utils.DateUtils;
|
|
|
+import com.pdaaphm.common.utils.DictUtils;
|
|
|
+import com.pdaaphm.common.utils.StringUtils;
|
|
|
+import com.pdaaphm.common.utils.file.FileUtils;
|
|
|
+import com.pdaaphm.common.utils.uuid.Seq;
|
|
|
+import com.pdaaphm.system.domain.AlgConfig;
|
|
|
+import com.pdaaphm.system.domain.Data;
|
|
|
+import com.pdaaphm.system.service.IAlgConfigService;
|
|
|
+import com.pdaaphm.system.service.IDataService;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.http.HttpStatus;
|
|
|
+import org.springframework.http.MediaType;
|
|
|
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import com.pdaaphm.system.mapper.TDataProcessMapper;
|
|
|
import com.pdaaphm.system.domain.TDataProcess;
|
|
|
import com.pdaaphm.system.service.ITDataProcessService;
|
|
|
+import org.springframework.web.reactive.function.client.WebClient;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
|
|
|
/**
|
|
|
* 数据处理Service业务层处理
|
|
@@ -17,9 +38,17 @@ import com.pdaaphm.system.service.ITDataProcessService;
|
|
|
@Service
|
|
|
public class TDataProcessServiceImpl implements ITDataProcessService
|
|
|
{
|
|
|
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
|
+
|
|
|
@Autowired
|
|
|
private TDataProcessMapper tDataProcessMapper;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ IAlgConfigService algConfigService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ IDataService dataService;
|
|
|
+
|
|
|
/**
|
|
|
* 查询数据处理
|
|
|
*
|
|
@@ -100,4 +129,92 @@ public class TDataProcessServiceImpl implements ITDataProcessService
|
|
|
dataProcess.setProcessType(type);
|
|
|
return this.selectTDataProcessList(dataProcess);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String runDataProcess(Long id) {
|
|
|
+ String errorMsg = "";
|
|
|
+ TDataProcess dataProcess = this.selectTDataProcessById(id);
|
|
|
+ if (dataProcess != null) {
|
|
|
+ AlgConfig algConfig = algConfigService.selectAlgConfigById(dataProcess.getProcessAlgId());
|
|
|
+ Data data = dataService.selectDataById(dataProcess.getProcessedDataId());
|
|
|
+ String bizAlgType = DictUtils.getDictLabel("biz_alg_type", dataProcess.getProcessType());
|
|
|
+
|
|
|
+ if (algConfig == null || data == null || bizAlgType == null) {
|
|
|
+ logger.warn("无法查找到该DataProcess的数据,id:{},algConfig:{},data:{},bizAlgType:{}", id, algConfig, data, bizAlgType);
|
|
|
+ errorMsg = "数据有残缺";
|
|
|
+ } else {
|
|
|
+ Map<String, String> resultMap = doRunDataProcess(algConfig, data, bizAlgType);
|
|
|
+ if (resultMap.get("errorMsg") != null){
|
|
|
+ // 成功场景
|
|
|
+ // create new data
|
|
|
+ Data newData = new Data();
|
|
|
+ newData.setDataPath(resultMap.get("resultPath"));
|
|
|
+ newData.setDataType("todo");
|
|
|
+ dataService.insertData(newData);
|
|
|
+ // update dataProcess
|
|
|
+ } else {
|
|
|
+ // 失败场景
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.warn("无法查找到该DataProcess的数据,id:{}", id);
|
|
|
+ errorMsg = "未找到该数据!";
|
|
|
+ }
|
|
|
+ return errorMsg;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String,String> doRunDataProcess(AlgConfig algConfig, Data data, String bizAlgType) {
|
|
|
+ Map<String,String> result = new HashMap<>(2);
|
|
|
+ String resultPath = "";
|
|
|
+ String algUrl = algConfig.getAlgUrl();
|
|
|
+ String dataPath = data.getDataPath();
|
|
|
+ String fileName = FileUtils.getNameNotSuffix(dataPath);
|
|
|
+ String originalFileName = fileName.split("_")[0];
|
|
|
+ // 文件名 原始文件名_类型_时间戳
|
|
|
+ resultPath = StringUtils.format("{}/{}/{}_{}_{}.csv", PadaphmConfig.getUploadPath(), DateUtils.datePath(), originalFileName, bizAlgType, Seq.getId(Seq.uploadSeqType));
|
|
|
+ String errorMsg = sendHttp(algUrl,dataPath,resultPath);
|
|
|
+ result.put("resultPath",resultPath);
|
|
|
+ result.put("errorMsg",errorMsg);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String sendHttp(String algUrl, String dataPath, String resultPath) {
|
|
|
+ HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory();
|
|
|
+ httpRequestFactory.setConnectionRequestTimeout(3000);
|
|
|
+ httpRequestFactory.setConnectTimeout(3000);
|
|
|
+ httpRequestFactory.setReadTimeout(3000);
|
|
|
+ WebClient webClient = WebClient.create(algUrl);
|
|
|
+
|
|
|
+ Map<String,String> requestDto = new HashMap<>(4);
|
|
|
+ requestDto.put("file_url",dataPath);
|
|
|
+ requestDto.put("result_file_path",resultPath);
|
|
|
+
|
|
|
+ // send post
|
|
|
+ Mono<BaseResponse> result = webClient.post() // 使用POST方法
|
|
|
+ .uri("/request") // 指定URI
|
|
|
+ .contentType(MediaType.APPLICATION_JSON) // 设置Content-Type为application/json
|
|
|
+ .bodyValue(requestDto) // 设置请求body
|
|
|
+ .retrieve() // 执行请求
|
|
|
+ .bodyToMono(BaseResponse.class)
|
|
|
+ .onErrorResume(e -> {
|
|
|
+ logger.error("http request error!!!",e);
|
|
|
+ if (e instanceof TimeoutException) {
|
|
|
+ // 对超时错误进行处理
|
|
|
+ return Mono.just(BaseResponse.error("Timeout error"));
|
|
|
+ } else {
|
|
|
+ // 对其他错误进行处理
|
|
|
+ return Mono.just(BaseResponse.error("Other error"));
|
|
|
+ }
|
|
|
+ }); // 将响应体转换为String
|
|
|
+ // get response
|
|
|
+ BaseResponse res = result.block();
|
|
|
+ if(res == null) {
|
|
|
+ logger.error("response is null");
|
|
|
+ return "response is null";
|
|
|
+ }
|
|
|
+ if(Integer.valueOf("200").equals(res.getCode())){
|
|
|
+ return res.getMsg();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|