|
@@ -1,9 +1,12 @@
|
|
|
package com.uavps.framework.udp;
|
|
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.uavps.common.utils.spring.SpringUtils;
|
|
|
+import com.uavps.framework.FileUtils;
|
|
|
import com.uavps.framework.config.UdpConfig;
|
|
|
import com.uavps.framework.udp.utils.UdpDataUtils;
|
|
|
+import com.uavps.framework.utils.UdpUtils;
|
|
|
import com.uavps.system.service.IUavpsAircraftService;
|
|
|
import com.uavps.system.udp.vo.Aircraft;
|
|
|
import com.uavps.system.udp.vo.AircraftFormation;
|
|
@@ -13,9 +16,14 @@ import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.websocket.Session;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.DatagramPacket;
|
|
|
import java.net.DatagramSocket;
|
|
|
+import java.net.SocketException;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -34,118 +42,106 @@ public class UdpServerService {
|
|
|
|
|
|
private IUavpsAircraftService uavpsAircraftService = SpringUtils.getBean(IUavpsAircraftService.class);
|
|
|
|
|
|
- /*public void receive(Session session) {
|
|
|
+ private static final DatagramSocket INSTANCE;
|
|
|
+
|
|
|
+ static {
|
|
|
try {
|
|
|
- //创建一个服务端对象,注册端口
|
|
|
- DatagramSocket socket = new DatagramSocket(udpConfig.getPort());
|
|
|
+ Integer port = SpringUtils.getBean(UdpConfig.class).getPort();
|
|
|
+ INSTANCE = new DatagramSocket(port);
|
|
|
+ log.info("UDP Server started on port: {}", port);
|
|
|
+ } catch (SocketException e) {
|
|
|
+ throw new RuntimeException("Failed to start UDP server", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- //创建一个数据包对象,用于接收数据,限制64kb
|
|
|
- byte[] data = new byte[1024 * 64];
|
|
|
- DatagramPacket packet = new DatagramPacket(data, data.length);
|
|
|
+ // 用于存储待写入的UDP包数据
|
|
|
+ private static final BlockingQueue<LogEntry> logQueue = new LinkedBlockingQueue<>();
|
|
|
|
|
|
- String uavData = "";
|
|
|
- boolean exit = false;
|
|
|
- while (true) {
|
|
|
- //使用数据包接收客户端发送的数据
|
|
|
- socket.receive(packet);
|
|
|
- uavData = new String(data, 0, packet.getLength());
|
|
|
- //websocket 退出,关闭upd服务
|
|
|
- if (uavData.length() == 4 && "EXIT".equals(uavData)) {
|
|
|
- socket.close();
|
|
|
- break;
|
|
|
- }
|
|
|
- JSONArray dataAry = JSON.parseArray(uavData);
|
|
|
- if (dataAry != null) {
|
|
|
- UavpsAircraft uavpsAircraft = null;
|
|
|
- for (int i = 0; i < dataAry.size(); i++) {
|
|
|
- UdpData udpData = JSON.parseObject(dataAry.getString(i), UdpData.class);
|
|
|
- uavpsAircraft = new UavpsAircraft();
|
|
|
- uavpsAircraft.setBizId(udpData.getBizId());
|
|
|
- uavpsAircraft.setDataTime(BigDecimal.valueOf(udpData.getTime()));
|
|
|
- uavpsAircraft.setAircraftData(dataAry.getString(i));
|
|
|
- uavpsAircraft.setCreateBy("python");
|
|
|
-
|
|
|
- //入库
|
|
|
- //uavpsAircraftService.insertUavpsAircraft(uavpsAircraft);
|
|
|
-
|
|
|
- //数据发送页面展示
|
|
|
- session.getBasicRemote().sendText(dataAry.getString(i));
|
|
|
- Thread.sleep(20);
|
|
|
-
|
|
|
- //判断数据是否发送完毕
|
|
|
- if ("true".equals(udpData.getFinished())) {
|
|
|
- exit = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (exit) {
|
|
|
- socket.close();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException | InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ // 日志条目类
|
|
|
+ private static class LogEntry {
|
|
|
+ final byte[] data;
|
|
|
+ final int length;
|
|
|
+ final long time;
|
|
|
+
|
|
|
+ LogEntry(byte[] data, int length, long time) {
|
|
|
+ this.data = data;
|
|
|
+ this.length = length;
|
|
|
+ this.time = time;
|
|
|
}
|
|
|
- }*/
|
|
|
+ }
|
|
|
|
|
|
public void receive(Session session) {
|
|
|
try {
|
|
|
//创建一个服务端对象,注册端口
|
|
|
- DatagramSocket socket = new DatagramSocket(udpConfig.getPort());
|
|
|
//创建一个数据包对象,用于接收数据
|
|
|
byte[] data = new byte[1024];
|
|
|
DatagramPacket packet = new DatagramPacket(data, data.length);
|
|
|
|
|
|
String uavData = "";
|
|
|
- int i = 0;
|
|
|
+ Date initDate = new Date();
|
|
|
+ String json = "";
|
|
|
while (true) {
|
|
|
+ json = "";
|
|
|
+ // 设置初始时间,用于计算每个数据的时间差
|
|
|
+ if(TaskInfo.INSTANCE.getInitFlag()){
|
|
|
+ initDate = new Date();
|
|
|
+ log.info("任务Id:{},任务开始,开始时间是:{}", TaskInfo.INSTANCE.getBizId(), initDate);
|
|
|
+ }
|
|
|
+
|
|
|
//使用数据包接收客户端发送的数据
|
|
|
- socket.receive(packet);
|
|
|
+ INSTANCE.receive(packet);
|
|
|
uavData = new String(data, 0, packet.getLength());
|
|
|
- //websocket 退出,关闭upd服务
|
|
|
- if (uavData.length() == 4 && "EXIT".equals(uavData)) {
|
|
|
- socket.close();
|
|
|
- break;
|
|
|
- }
|
|
|
byte[] remoteData = UdpDataUtils.hexStringToBytes(uavData);
|
|
|
- //byte[] remoteData = packet.getData();
|
|
|
- if (remoteData != null && remoteData.length > 3) {
|
|
|
- UdpData udpData = UdpDataUtils.parseFrame(remoteData);
|
|
|
- //System.out.println(UdpDataUtils.bizId);
|
|
|
- if (udpData != null) {
|
|
|
- UdpDataUtils.time += 1;
|
|
|
- udpData.setTime(UdpDataUtils.time);
|
|
|
- if(udpData.getTargetAircraft() != null){
|
|
|
- for (AircraftFormation aircraftFormation : udpData.getTargetAircraft()) {
|
|
|
- for (Aircraft aircraft : aircraftFormation.getAircrafts()) {
|
|
|
- if(CoordinateSystem.INSTANCE.getInitFlag()){
|
|
|
- CoordinateSystem.INSTANCE.setX(aircraft.getCoordinateX() - CoordinateSystem.INSTANCE.getConversionCenterPixel());
|
|
|
- CoordinateSystem.INSTANCE.setY(aircraft.getCoordinateY() - CoordinateSystem.INSTANCE.getConversionCenterPixel());
|
|
|
- CoordinateSystem.INSTANCE.setInitFlag(false);
|
|
|
- }
|
|
|
- getNewXYAircraftCoordinate(aircraft);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- ObjectMapper mapper = new ObjectMapper();
|
|
|
- String json = mapper.writeValueAsString(udpData);
|
|
|
- session.getBasicRemote().sendText(json);
|
|
|
- //System.out.println(json);
|
|
|
- }
|
|
|
-// log.info("loop:"+(i++)+":"+udpData.getTargetAircraftsString());
|
|
|
- }
|
|
|
+ // 将日志条目放入队列
|
|
|
+ // 复制数据以避免后续修改影响
|
|
|
+ int length = remoteData.length;
|
|
|
+ byte[] dataCopy = new byte[length];
|
|
|
+ System.arraycopy(remoteData, 0, dataCopy, 0, length);
|
|
|
+ long lastTime = new Date().getTime() - initDate.getTime();
|
|
|
+ logQueue.add(new LogEntry(dataCopy, length, lastTime));
|
|
|
+ TaskInfo.INSTANCE.setLastTime(lastTime);
|
|
|
+ session.getBasicRemote().sendText(UdpUtils.getJson(remoteData));
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ log.error("UdpServerService.receive error", e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void getNewXYAircraftCoordinate(Aircraft aircraft) {
|
|
|
- aircraft.setCoordinateOX(aircraft.getCoordinateX());
|
|
|
- aircraft.setCoordinateOY(aircraft.getCoordinateY());
|
|
|
- aircraft.setCoordinateOZ(aircraft.getCoordinateZ());
|
|
|
- aircraft.setCoordinateX((aircraft.getCoordinateX()-CoordinateSystem.INSTANCE.getX())/CoordinateSystem.INSTANCE.getConversionRate());
|
|
|
- aircraft.setCoordinateY((aircraft.getCoordinateY()-CoordinateSystem.INSTANCE.getY())/CoordinateSystem.INSTANCE.getConversionRate());
|
|
|
+ // 文件写入线程
|
|
|
+ private static class FileWriterThread extends Thread {
|
|
|
+ private volatile boolean running = true;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try (FileOutputStream fos = new FileOutputStream(TaskInfo.INSTANCE.getFilePath(), true)) {
|
|
|
+ while (running || !logQueue.isEmpty()) {
|
|
|
+ LogEntry entry = logQueue.take(); // 阻塞直到有数据
|
|
|
+
|
|
|
+ // 格式化时间戳(10位数字 + 冒号)
|
|
|
+ String timestamp = String.format("%010d:", entry.time);
|
|
|
+
|
|
|
+ // 先写入时间戳
|
|
|
+ fos.write(timestamp.getBytes());
|
|
|
+
|
|
|
+ // 再写入实际数据
|
|
|
+ fos.write(entry.data, 0, entry.length);
|
|
|
+
|
|
|
+ // 可选: 每条记录后加换行
|
|
|
+ fos.write('\n');
|
|
|
+
|
|
|
+ fos.flush();
|
|
|
+ }
|
|
|
+ } catch (IOException | InterruptedException e) {
|
|
|
+ if (running) { // 仅在非正常关闭时打印错误
|
|
|
+ System.err.println("File writer error: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void shutdown() {
|
|
|
+ running = false;
|
|
|
+ this.interrupt(); // 中断阻塞的take操作
|
|
|
+ }
|
|
|
}
|
|
|
}
|