Prechádzať zdrojové kódy

TCP协议, 客户端,服务端实现, 按照协商规则接收和发送消息

wanggaokun 1 rok pred
rodič
commit
94c6181603
27 zmenil súbory, kde vykonal 578 pridanie a 189 odobranie
  1. 0 4
      PHM-admin/phm-manage/pom.xml
  2. 1 0
      PHM-admin/phm-manage/src/main/java/com/phm/manage/controller/externalInter/ExternalInterfaceController.java
  3. 4 0
      PHM-admin/phm-netty/pom.xml
  4. 3 2
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyClient.java
  5. 1 1
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/ClientListenerHandler.java
  6. 14 4
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyClientInitialize.java
  7. 27 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/hexbyte/HexByteArrayDecoder.java
  8. 17 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/hexbyte/HexByteArrayEncoder.java
  9. 1 1
      PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/message/MessageDecodeHandler.java
  10. 1 1
      PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/message/MessageEncodeHandler.java
  11. 2 2
      PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyConfig.java
  12. 62 20
      PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java
  13. 9 9
      PHM-admin/phm-netty/src/main/java/com/phm/netty/enums/MessageEnum.java
  14. 45 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerInitialize.java
  15. 125 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ByteArrayMessageHandler.java
  16. 33 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/HeartbeatHandler.java
  17. 80 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/HexByteArrayServerHandler.java
  18. 0 34
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerInitialize.java
  19. 40 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerLengthFieldBasedHandler.java
  20. 18 19
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ServerListenerHandler.java
  21. 12 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IProcessService.java
  22. 0 12
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IPushMsgService.java
  23. 0 14
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/ISendMsgService.java
  24. 18 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/ProcessService.java
  25. 0 32
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/PushMsgService.java
  26. 0 34
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/SendMsgService.java
  27. 65 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/ByteUtils.java

+ 0 - 4
PHM-admin/phm-manage/pom.xml

@@ -25,10 +25,6 @@
             <groupId>com.phm</groupId>
             <artifactId>phm-system</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.phm</groupId>
-            <artifactId>phm-netty</artifactId>
-        </dependency>
     </dependencies>
 
 </project>

+ 1 - 0
PHM-admin/phm-manage/src/main/java/com/phm/manage/controller/externalInter/ExternalInterfaceController.java

@@ -43,6 +43,7 @@ public class ExternalInterfaceController {
     @Autowired
     private IOrderInfoService orderInfoService;
 
+
     /**
      * 指令接收API
      *

+ 4 - 0
PHM-admin/phm-netty/pom.xml

@@ -28,6 +28,10 @@
             <groupId>com.phm</groupId>
             <artifactId>phm-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.phm</groupId>
+            <artifactId>phm-manage</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 3 - 2
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyClient.java

@@ -29,7 +29,8 @@ public class NettyClient {
      * 启动netty服务端
      */
     public void start() {
-        connect(nettyProperties.getRemoteHost(), nettyProperties.getRemotePort());
+//        connect(nettyProperties.getRemoteHost(), nettyProperties.getRemotePort());
+        connect("192.168.0.101", 8888);
     }
 
     public void connect(final String host, final int port) {
@@ -37,7 +38,7 @@ public class NettyClient {
         Bootstrap bootstrap = new Bootstrap()
                 .group(group).channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
-                .handler(new NettyClientInitialize());
+                .handler(new NettyClientInitialize(nettyProperties));
         try {
             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
             channelFuture.channel().closeFuture().sync();

+ 1 - 1
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/ClientListenerHandler.java

@@ -43,7 +43,7 @@ public class ClientListenerHandler extends SimpleChannelInboundHandler<Message>
      */
     @Override
     protected void channelRead0(ChannelHandlerContext context, Message message) throws Exception {
-        log.info("收到服务端的消息:{}", message.getContent());
+        log.info("收到服务端的消息:{}", message.getData());
         context.channel().close();
     }
 

+ 14 - 4
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyClientInitialize.java

@@ -1,7 +1,10 @@
 package com.phm.netty.client.handler;
 
-import com.phm.netty.utils.MessageDecodeHandler;
-import com.phm.netty.utils.MessageEncodeHandler;
+import com.phm.netty.config.NettyProperties;
+import com.phm.netty.codec.message.MessageDecodeHandler;
+import com.phm.netty.codec.message.MessageEncodeHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
@@ -15,11 +18,18 @@ import org.springframework.stereotype.Component;
 @Component
 public class NettyClientInitialize extends ChannelInitializer<SocketChannel> {
 
+    final
+    NettyProperties nettyProperties;
+
+    public NettyClientInitialize(NettyProperties nettyProperties) {
+        this.nettyProperties = nettyProperties;
+    }
+
     @Override
     protected void initChannel(SocketChannel socketChannel) throws Exception {
         // 数据分割符
-//        String delimiterStr = nettyProperties.getSeparator();
-//        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
+        String delimiterStr = nettyProperties.getSeparator();
+        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
         ChannelPipeline pipeline = socketChannel.pipeline();
         // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
 //        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

+ 27 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/hexbyte/HexByteArrayDecoder.java

@@ -0,0 +1,27 @@
+package com.phm.netty.codec.hexbyte;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+/**
+ * @Description HexByteArrayDecoder 自定义的解码器,将字节转换为十六进制字节数组
+ * @Author WGK
+ * @Date 2023/9/21 14:26
+ */
+public class HexByteArrayDecoder extends ByteToMessageDecoder {
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        int readableBytes = in.readableBytes();
+        if (readableBytes < 2) {
+            return; // 不足2字节无法解析
+        }
+        byte[] bytes = new byte[readableBytes];
+        in.readBytes(bytes);
+        ByteBufUtil.hexDump(in);
+        out.add(bytes);
+    }
+}

+ 17 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/hexbyte/HexByteArrayEncoder.java

@@ -0,0 +1,17 @@
+package com.phm.netty.codec.hexbyte;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ * @Description HexByteArrayEncoder 编码器,将十六进制字节数组编码为字节数据
+ * @Author WGK
+ * @Date 2023/9/21 14:33
+ */
+public class HexByteArrayEncoder extends MessageToByteEncoder<byte[]> {
+    @Override
+    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
+        out.writeBytes(msg);
+    }
+}

+ 1 - 1
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageDecodeHandler.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/message/MessageDecodeHandler.java

@@ -1,4 +1,4 @@
-package com.phm.netty.utils;
+package com.phm.netty.codec.message;
 
 import com.phm.netty.domain.Message;
 import io.netty.buffer.ByteBuf;

+ 1 - 1
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageEncodeHandler.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/codec/message/MessageEncodeHandler.java

@@ -1,4 +1,4 @@
-package com.phm.netty.utils;
+package com.phm.netty.codec.message;
 
 import com.phm.netty.domain.Message;
 import io.netty.buffer.ByteBuf;

+ 2 - 2
PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyConfig.java

@@ -1,6 +1,6 @@
 package com.phm.netty.config;
 
-import com.phm.netty.server.handler.NettyServerInitialize;
+import com.phm.netty.server.NettyServerInitialize;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -56,6 +56,6 @@ public class NettyConfig {
                 // 指定连接超时时间
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                 // 指定worker处理器
-                .childHandler(new NettyServerInitialize());
+                .childHandler(new NettyServerInitialize(nettyProperties));
     }
 }

+ 62 - 20
PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java

@@ -2,65 +2,107 @@ package com.phm.netty.domain;
 
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson2.JSONObject;
-import lombok.Getter;
-import lombok.Setter;
+import com.phm.netty.utils.ByteUtils;
+import io.netty.util.CharsetUtil;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
 
 /**
  * @Description Message
  * @Author WGK
  * @Date 2023/9/14 11:41
  */
-@Setter
-@Getter
-public class Message {
-    /**
-     * 客户端ID
-     */
-    private String id;
+@Data
+@Accessors(chain = true)
+public class Message implements Serializable {
     /**
      * 数据长度
      */
-    private Integer len;
+    private int len;
 
     /**
      * 接收的通讯数据body
      */
-    private String content;
+    private String data;
 
     /**
      * 消息类型
      */
-    private Integer type;
+    private short type;
 
     /**
      * 毫秒时间戳,从2020.1.1开始
      */
-    private Long timestamp = DateUtil.current(false) - DateUtil.parse("2020-01-01").getTime();
+    private int timestamp = (int) (DateUtil.current(false) - DateUtil.parse("2020-01-01").getTime());
 
     /**
      * 数据源
      */
-    private String source = "GPHM";
+    private String source = StringUtils.rightPad("GPHM", 8);
 
     /**
      * 目标
      */
-    private String target = "SPHNM";
+    private String target = StringUtils.rightPad("SPHM", 8);
 
     /**
      * 目标
      */
-    private Integer reserve = 0;
+    private int reserver = 0;
 
     public Message() {
     }
 
+    public static void bytesToMsg(Message msg, byte[] buf, int len) {
+        int index = 0;
+        msg.len = len;
+        msg.type = ByteUtils.toShort(buf, index);
+        index += 2;
+        msg.timestamp = ByteUtils.toInt(buf, index);
+        index += 4;
+        msg.reserver = ByteUtils.toInt(buf, index);
+        index += 4;
+        byte[] bytes = new byte[len - index];
+        System.arraycopy(buf, index, bytes, 0, len - index);
+        msg.data = new String(bytes, CharsetUtil.UTF_8);
+    }
+
+    public static void main(String[] args) {
+        Message message = new Message();
+        message.setLen(123).setTarget("GPHM").setSource("SPHM").setData("HHHHHHGGGFFFJHGSJDGHJSGDH");
+        byte[] byteArray = Message.msgToBytes(message,200);
+        System.out.println(Arrays.toString(byteArray));
+    }
+
+    public static byte[] msgToBytes(Message msg, int len) {
+        byte[] bytes = new byte[len + 4];
+        byte[] msgLenBytes = new byte[4];
+        msgLenBytes = ByteUtils.getBytes(msg.len, 4);
+        System.arraycopy(msgLenBytes, 0, bytes, 0, 4);
+        byte[] msgTypeBytes = new byte[2];
+        msgTypeBytes = ByteUtils.getBytes(msg.type, 2);
+        System.arraycopy(msgTypeBytes, 0, bytes, 4, 2);
+        byte[] msgTimeStampBytes = new byte[4];
+        msgTimeStampBytes = ByteUtils.getBytes(msg.timestamp, 4);
+        System.arraycopy(msgTimeStampBytes, 0, bytes, 6, 4);
+        byte[] msgReserverBytes = new byte[4];
+        msgReserverBytes = ByteUtils.getBytes(msg.reserver, 4);
+        System.arraycopy(msgReserverBytes, 0, bytes, 10, 4);
+        byte[] msgDataBytes = new byte[len];
+        msgDataBytes = msg.getData().getBytes();
+        System.arraycopy(msgDataBytes, 0, bytes, 14, msgDataBytes.length);
+        return bytes;
+    }
+
     public Message(Object object) {
         String str = object.toString();
         JSONObject jsonObject = JSONObject.parseObject(str);
-        type = Integer.valueOf(jsonObject.getString("type"));
-        content = jsonObject.getString("content");
-        id = jsonObject.getString("id");
+        type = Short.valueOf(jsonObject.getString("type"));
+        data = jsonObject.getString("content");
         len = str.length();
     }
 
@@ -71,7 +113,7 @@ public class Message {
                 "  \"source\": " + source + ",\n" +
                 "  \"target\": " + target + ",\n" +
                 "  \"timestamp\": " + timestamp + ",\n" +
-                "  \"content\": " + content + "\n" +
+                "  \"data\": " + data + "\n" +
                 "}";
     }
 }

+ 9 - 9
PHM-admin/phm-netty/src/main/java/com/phm/netty/enums/MessageEnum.java

@@ -22,9 +22,11 @@ public enum MessageEnum {
     ISOLATE(4, "机载故障深度隔离指令"),
     GROUND_FAULT(5, "地面故障诊断指令"),
     FAILURE_PREDICTION(6, "故障预测指令"),
-    // 以下作为客户端按发送
-    HEARTBEAT(7, "心跳机制"),
-    NTP(8, "校时机制");
+    FAILURE_PREDICTION1(7, "获取模型信息"),
+    FAILURE_PREDICTION2(8, "获取架次信息"),
+    ERROR(99, "异常信息"),
+    HEARTBEAT(100, "心跳机制"),
+    NTP(101, "校时机制");
 
     public final Integer type;
     public final String content;
@@ -36,12 +38,10 @@ public enum MessageEnum {
 
     // case中判断使用
     public static MessageEnum getStructureEnum(Message msg) {
-        Integer type = Optional.ofNullable(msg)
+        Integer type = Integer.valueOf(Optional.ofNullable(msg)
                 .map(Message::getType)
-                .orElse(0);
-        if (type == 0) {
-            return null;
-        } else {
+                .orElse((short) 0));
+        if (type != 0) {
             List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())
                     .filter((item) -> item.getType().equals(type))
                     .distinct()
@@ -49,7 +49,7 @@ public enum MessageEnum {
             if (!objectEnums.isEmpty()) {
                 return objectEnums.get(0);
             }
-            return null;
         }
+        return null;
     }
 }

+ 45 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerInitialize.java

@@ -0,0 +1,45 @@
+package com.phm.netty.server;
+
+import com.phm.netty.config.NettyProperties;
+import com.phm.netty.codec.hexbyte.HexByteArrayDecoder;
+import com.phm.netty.codec.hexbyte.HexByteArrayEncoder;
+import com.phm.netty.server.handler.ByteArrayMessageHandler;
+import com.phm.netty.server.handler.HeartbeatHandler;
+import com.phm.netty.server.handler.HexByteArrayServerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.bytes.ByteArrayDecoder;
+import io.netty.handler.codec.bytes.ByteArrayEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description NettyServerHandler 服务端初始类
+ * @Author WGK
+ * @Date 2023/9/13 13:44
+ */
+@Component
+public class NettyServerInitialize extends ChannelInitializer<SocketChannel> {
+
+    final
+    NettyProperties nettyProperties;
+
+    public NettyServerInitialize(NettyProperties nettyProperties) {
+        this.nettyProperties = nettyProperties;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        ChannelPipeline pipeline = socketChannel.pipeline();
+        // 添加IdleStateHandler来处理空闲状态
+        pipeline.addLast(new IdleStateHandler(0, 5, 0));
+        // 添加心跳处理器
+        pipeline.addLast(new HeartbeatHandler());
+        pipeline.addLast(new ByteArrayDecoder());
+        pipeline.addLast(new ByteArrayEncoder());
+        // 对数据进行最终处理
+        pipeline.addLast(new ByteArrayMessageHandler());
+    }
+}
+

+ 125 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ByteArrayMessageHandler.java

@@ -0,0 +1,125 @@
+package com.phm.netty.server.handler;
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.ObjectUtil;
+import com.phm.netty.service.IProcessService;
+import com.phm.netty.domain.Message;
+import com.phm.netty.utils.ChannelMap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @Description ByteArrayMessageHandler
+ * @Author WGK
+ * @Date 2023/9/20 13:40
+ */
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<Object> {
+    @Autowired
+    IProcessService processService;
+
+    /**
+     * 设备接入连接时处理
+     *
+     * @param ctx ctx
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
+        Message msg = new Message();
+        msg.setData("和服务端连接成功");
+        msg.setTarget("Client");
+        msg.setSource("Serve");
+        ctx.channel().writeAndFlush(msg);
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object object) throws Exception {
+        Message errorMsg = new Message();
+        errorMsg.setLen(35).setType((short) 99).setData("error");
+        byte[] binaryData = (byte[]) object;
+        binaryData = Message.msgToBytes(errorMsg, 35);
+        // 处理接收到的二进制消息
+        int len = 0;
+        if (binaryData.length >= 4) {
+            // 使用 ByteBuffer 读取前4个字节并转换为整数
+            ByteBuffer buffer = ByteBuffer.wrap(binaryData, 0, 4);
+            len = buffer.getInt();
+        }
+        if (len == 0) {
+            ctx.writeAndFlush(Message.msgToBytes(errorMsg, errorMsg.getLen()));
+            return;
+        }
+        // 创建一个新的字节数组来存储截取的部分
+        byte[] subArray = new byte[len];
+        // 方法将源字节数组的一部分复制到新数组中
+        System.arraycopy(binaryData, 4, subArray, 0, len);
+        Message msg = new Message();
+        Message.bytesToMsg(msg, subArray, len);
+        Message lastMsg = processService.orderHandle(msg);
+        log.info("输出传输的内容对象:{}", lastMsg);
+        if (ObjectUtil.isEmpty(lastMsg)) {
+            ctx.writeAndFlush(Message.msgToBytes(errorMsg, errorMsg.getLen()));
+            return;
+        }
+        // 转byte[] 返回消息
+        ctx.writeAndFlush(Message.msgToBytes(lastMsg, lastMsg.getLen()));
+    }
+
+    /**
+     * 设备下线处理
+     *
+     * @param ctx ctx
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        log.info("设备下线了:{}", ctx.channel().id().asLongText());
+        // map中移除channel
+        removeId(ctx);
+    }
+
+    /**
+     * 设备连接异常处理
+     *
+     * @param ctx   ctx
+     * @param cause cause
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // 打印异常
+        log.info("异常:{}", cause.getMessage());
+        // map中移除channel
+        removeId(ctx);
+        // 关闭连接
+        ctx.close();
+    }
+
+    private void removeId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("id");
+        // 获取channel中id
+        String id = ctx.channel().attr(key).get();
+        // map移除channel
+        if (CollectionUtil.isNotEmpty(ChannelMap.getChannelMap())) {
+            ChannelMap.getChannelMap().remove(id);
+        } else {
+            log.info("ChannelMap === 为空");
+        }
+    }
+
+    public void channelWrite(Channel channel, Message message) throws Exception {
+        if (null == channel) {
+            throw new RuntimeException("客户端已离线");
+        }
+        channel.writeAndFlush(message);
+    }
+}

+ 33 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/HeartbeatHandler.java

@@ -0,0 +1,33 @@
+package com.phm.netty.server.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+
+/**
+ * @Description HeartbeatHandler 服务端心跳机制
+ *
+ * @Author WGK
+ * @Date 2023/9/19 22:17
+ */
+public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
+    private static final ByteBuf HEARTBEAT_MESSAGE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat", CharsetUtil.UTF_8));
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            // 如果触发了空闲事件,发送心跳消息
+            IdleStateEvent e = (IdleStateEvent) evt;
+            if (e.state() == IdleState.WRITER_IDLE) {
+                ctx.writeAndFlush(HEARTBEAT_MESSAGE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+}

+ 80 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/HexByteArrayServerHandler.java

@@ -0,0 +1,80 @@
+package com.phm.netty.server.handler;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.phm.netty.domain.Message;
+import com.phm.netty.utils.ChannelMap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Description HexByteArrayServerHandler 服务端处理器,接收并处理十六进制字节数组消息
+ * @Author WGK
+ * @Date 2023/9/21 14:25
+ */
+@ChannelHandler.Sharable
+@Slf4j
+public class HexByteArrayServerHandler extends SimpleChannelInboundHandler<byte[]> {
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
+        // 在这里处理接收到的十六进制字节数组消息
+        log.info("Received message: ");
+        ctx.writeAndFlush(msg);
+    }
+
+    /**
+     * 设备下线处理
+     *
+     * @param ctx ctx
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        log.info("设备下线了:{}", ctx.channel().id().asLongText());
+        // map中移除channel
+        removeId(ctx);
+    }
+
+    /**
+     * 设备连接异常处理
+     *
+     * @param ctx   ctx
+     * @param cause cause
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // 打印异常
+        log.info("异常:{}", cause.getMessage());
+        // map中移除channel
+        removeId(ctx);
+        // 关闭连接
+        ctx.close();
+    }
+
+    private void removeId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("id");
+        // 获取channel中id
+        String id = ctx.channel().attr(key).get();
+        // map移除channel
+        if (CollectionUtil.isNotEmpty(ChannelMap.getChannelMap())) {
+            ChannelMap.getChannelMap().remove(id);
+        } else {
+            log.info("ChannelMap === 为空");
+        }
+    }
+
+    public void channelWrite(Channel channel, Message message) throws Exception {
+        if (null == channel) {
+            throw new RuntimeException("客户端已离线");
+        }
+        channel.writeAndFlush(message);
+    }
+}

+ 0 - 34
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerInitialize.java

@@ -1,34 +0,0 @@
-package com.phm.netty.server.handler;
-
-import com.phm.netty.utils.MessageDecodeHandler;
-import com.phm.netty.utils.MessageEncodeHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import org.springframework.stereotype.Component;
-
-/**
- * @Description NettyServerHandler 服务端初始类
- * @Author WGK
- * @Date 2023/9/13 13:44
- */
-@Component
-public class NettyServerInitialize extends ChannelInitializer<SocketChannel> {
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        // 数据分割符
-//        String delimiterStr = nettyProperties.getSeparator();
-//        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
-        ChannelPipeline pipeline = socketChannel.pipeline();
-        // 使用自定义处理拆包/沾包,每次查找的最大长度 防止恶意数据
-//        pipeline.addLast(new DelimiterBasedFrameDecoder(151, delimiter));
-        // 将上一步解码后的数据转码为Message实例
-        pipeline.addLast(new MessageDecodeHandler());
-        // 对发送客户端的数据进行编码,并添加数据分隔符
-        pipeline.addLast(new MessageEncodeHandler());
-        // 对数据进行最终处理
-        pipeline.addLast(new ServerListenerHandler());
-    }
-}
-

+ 40 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerLengthFieldBasedHandler.java

@@ -0,0 +1,40 @@
+package com.phm.netty.server.handler;
+
+import com.phm.netty.codec.message.MessageDecodeHandler;
+import com.phm.netty.codec.message.MessageEncodeHandler;
+import com.phm.netty.server.handler.ServerListenerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+/**
+ * @Description NettyServerLengthFieldBasedHandler
+ * @Author WGK
+ * @Date 2023/9/18 15:44
+ */
+public class NettyServerLengthFieldBasedHandler extends ChannelInitializer<SocketChannel> {
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        ChannelPipeline pipeline = socketChannel.pipeline();
+        // 请求头包含数据长度,根据长度进行沾包拆包处理
+        /*
+         * maxFrameLength:指定了每个包所能传递的最大数据包大小;
+         * lengthFieldOffset:指定了长度字段在字节码中的偏移量;
+         * lengthFieldLength:指定了长度字段所占用的字节长度;
+         * lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
+         * initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
+         */
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
+        // 在请求头添加字节长度字段
+        pipeline.addLast(new LengthFieldPrepender(4));
+        // 将上一步解码后的数据转码为Message实例
+        pipeline.addLast(new MessageDecodeHandler());
+        // 对发送客户端的数据进行编码
+        pipeline.addLast(new MessageEncodeHandler());
+        // 对数据进行最终处理
+        pipeline.addLast(new ServerListenerHandler());
+
+    }
+}

+ 18 - 19
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ServerListenerHandler.java

@@ -12,8 +12,6 @@ import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import java.util.Objects;
-
 /**
  * @Description ServerListenerHandler 数据处理器,针对不同类型数据分类处理
  * @Author WGK
@@ -33,7 +31,7 @@ public class ServerListenerHandler extends SimpleChannelInboundHandler<Message>
     public void handlerAdded(ChannelHandlerContext ctx) {
         log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
         Message msg = new Message();
-        msg.setContent("和服务端连接成功");
+        msg.setData("和服务端连接成功");
         msg.setTarget("Client");
         msg.setSource("Serve");
         ctx.channel().writeAndFlush(msg);
@@ -48,24 +46,25 @@ public class ServerListenerHandler extends SimpleChannelInboundHandler<Message>
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
         // 获取消息实例中的消息体
-        String content = msg.getContent();
-        msg.setId(ctx.channel().id().asLongText());
+        String content = msg.getData();
+//        msg.setId(ctx.channel().id().asLongText());
         // 对不同消息类型进行处理
         MessageEnum type = MessageEnum.getStructureEnum(msg);
-        switch (Objects.requireNonNull(type)) {
-            case GET_DATA_BY_ONBOARD:
-                // 将通道加入ChannelMap
-                ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
-                // 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
-                AttributeKey<String> key = AttributeKey.valueOf("id");
-                ctx.channel().attr(key).setIfAbsent(msg.getId());
-                this.channelWrite(ctx.channel(), msg);
-                // TODO 业务处理
-            case FALSE_ALARM:
-                // TODO 业务处理
-            default:
-                System.out.println(type.content + "消息内容: " + content);
-        }
+        this.channelWrite(ctx.channel(), msg);
+//        switch (Objects.requireNonNull(type)) {
+//            case GET_DATA_BY_ONBOARD:
+////                // 将通道加入ChannelMap
+////                ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
+////                // 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+////                AttributeKey<String> key = AttributeKey.valueOf("id");
+////                ctx.channel().attr(key).setIfAbsent(msg.getId());
+////                this.channelWrite(ctx.channel(), msg);
+//                // TODO 业务处理
+//            case FALSE_ALARM:
+//                // TODO 业务处理
+//            default:
+//                System.out.println(type.content + "消息内容: " + content);
+//        }
     }
 
     /**

+ 12 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IProcessService.java

@@ -0,0 +1,12 @@
+package com.phm.netty.service;
+
+import com.phm.netty.domain.Message;
+
+/**
+ * @Description IProcessService
+ * @Author WGK
+ * @Date 2023/9/22 9:52
+ */
+public interface IProcessService {
+    Message orderHandle(Message message);
+}

+ 0 - 12
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IPushMsgService.java

@@ -1,12 +0,0 @@
-package com.phm.netty.service;
-
-import com.phm.netty.domain.Message;
-
-/**
- * @Description PushMsgService
- * @Author WGK
- * @Date 2023/9/14 14:20
- */
-public interface IPushMsgService {
-    void push(Message message);
-}

+ 0 - 14
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/ISendMsgService.java

@@ -1,14 +0,0 @@
-package com.phm.netty.service;
-
-import com.phm.netty.domain.Message;
-
-/**
- * @Description ISendMsgService
- * @Author WGK
- * @Date 2023/9/15 9:05
- */
-public interface ISendMsgService {
-    void send(Message message);
-
-    void start();
-}

+ 18 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/ProcessService.java

@@ -0,0 +1,18 @@
+package com.phm.netty.service.impl;
+
+import com.phm.netty.service.IProcessService;
+import com.phm.netty.domain.Message;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Description ProcessService
+ * @Author WGK
+ * @Date 2023/9/22 9:54
+ */
+@Service
+public class ProcessService implements IProcessService {
+    @Override
+    public Message orderHandle(Message message) {
+        return null;
+    }
+}

+ 0 - 32
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/PushMsgService.java

@@ -1,32 +0,0 @@
-package com.phm.netty.service.impl;
-
-import com.phm.netty.domain.Message;
-import com.phm.netty.utils.ChannelMap;
-import com.phm.netty.service.IPushMsgService;
-import io.netty.channel.Channel;
-import org.springframework.stereotype.Service;
-
-/**
- * @Description PushMsgService
- * @Author WGK
- * @Date 2023/9/14 14:21
- */
-@Service
-public class PushMsgService implements IPushMsgService {
-
-    /**
-     * 向客户端发送请求
-     *
-     * @param message 消息
-     */
-    @Override
-    public void push(Message message) {
-        // 客户端ID
-        String id = message.getId();
-        Channel channel = ChannelMap.getChannel(id);
-        if (null == channel) {
-            throw new RuntimeException("客户端已离线");
-        }
-        channel.writeAndFlush(message.toJsonString());
-    }
-}

+ 0 - 34
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/SendMsgService.java

@@ -1,34 +0,0 @@
-package com.phm.netty.service.impl;
-
-import com.phm.netty.client.NettyClient;
-import com.phm.netty.domain.Message;
-import com.phm.netty.service.ISendMsgService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * @Description SendMsgService 客户端发送给客户端消息
- * @Author WGK
- * @Date 2023/9/15 9:06
- */
-@Service
-public class SendMsgService implements ISendMsgService {
-
-    @Autowired
-    private NettyClient nettyClient;
-
-    /**
-     * 向服务端发送请求
-     *
-     * @param message msg
-     */
-    @Override
-    public void send(Message message) {
-
-    }
-
-    @Override
-    public void start() {
-        nettyClient.start();
-    }
-}

+ 65 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/ByteUtils.java

@@ -0,0 +1,65 @@
+package com.phm.netty.utils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @Description ByteUtils
+ * @Author WGK
+ * @Date 2023/9/20 16:43
+ */
+public class ByteUtils {
+
+    /**
+     * 返回由字节数组中的指定的两个字节转换来的 16 位有符号整数
+     *
+     * @param bytes      字节数组
+     * @param startIndex 起始下标
+     * @return 由两个字节构成的 16 位有符号整数
+     */
+    public static short toShort(byte[] bytes, int startIndex) {
+        short shortValue = 0;
+        if (bytes.length >= 2) {
+            // 使用 ByteBuffer 读取前2个字节并转换为 short
+            ByteBuffer buffer = ByteBuffer.wrap(bytes, startIndex, 2);
+            shortValue = buffer.getShort();
+        }
+        return shortValue;
+    }
+
+    /**
+     * 返回由字节数组中的指定的四个字节转换来的 32 位有符号整数
+     *
+     * @param bytes      字节数组
+     * @param startIndex 起始下标
+     * @return 由四个字节构成的 32 位有符号整数
+     */
+
+    public static int toInt(byte[] bytes, int startIndex) {
+        int intValue = 0;
+        if (bytes.length >= 2) {
+            // 使用 ByteBuffer 读取前2个字节并转换为 short
+            ByteBuffer buffer = ByteBuffer.wrap(bytes, startIndex, 4);
+            intValue = buffer.getInt();
+        }
+        return intValue;
+    }
+
+    /**
+     * int shor 类型转byte[]
+     *
+     * @param object 被转对象
+     * @param len 数组长度
+     * @return 输出对应长度字节数组
+     */
+    public static byte[] getBytes(Object object, int len) {
+        ByteBuffer buffer = ByteBuffer.allocate(len);
+        if (object instanceof Integer) {
+            // 使用ByteBuffer将int转为byte数组
+            buffer.putInt((Integer) object);
+        } else if (object instanceof Short) {
+            // 使用ByteBuffer将Short转为byte数组
+            buffer.putShort((Short) object);
+        }
+        return buffer.array();
+    }
+}