wanggaokun 1 سال پیش
والد
کامیت
a14006bb3d

+ 7 - 7
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyClient.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyTcpClient.java

@@ -1,7 +1,7 @@
 package com.phm.netty.client;
 
 import com.phm.netty.client.handler.NettyClientInitialize;
-import com.phm.netty.config.NettyProperties;
+import com.phm.netty.config.NettyTcpProperties;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
@@ -17,12 +17,12 @@ import org.springframework.stereotype.Component;
  */
 @Component
 @Slf4j
-public class NettyClient {
+public class NettyTcpClient {
     final
-    NettyProperties nettyProperties;
+    NettyTcpProperties nettyTcpProperties;
 
-    public NettyClient(NettyProperties nettyProperties) {
-        this.nettyProperties = nettyProperties;
+    public NettyTcpClient(NettyTcpProperties nettyTcpProperties) {
+        this.nettyTcpProperties = nettyTcpProperties;
     }
 
     /**
@@ -38,7 +38,7 @@ public class NettyClient {
         Bootstrap bootstrap = new Bootstrap()
                 .group(group).channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
-                .handler(new NettyClientInitialize(nettyProperties));
+                .handler(new NettyClientInitialize(nettyTcpProperties));
         try {
             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
             channelFuture.channel().closeFuture().sync();
@@ -49,6 +49,6 @@ public class NettyClient {
             // 关闭连接
             group.shutdownGracefully();
         }
-        log.info("启动Netty客户端: {}", nettyProperties.getPort());
+        log.info("启动Netty客户端: {}", nettyTcpProperties.getPort());
     }
 }

+ 69 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyUdpClient.java

@@ -0,0 +1,69 @@
+package com.phm.netty.client;
+
+import com.phm.netty.client.handler.NettyUdpClientHandler;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @Description NettyUdpClient
+ * @Author WGK
+ * @Date 2023/10/30 15:25
+ */
+@Slf4j
+@Component
+public class NettyUdpClient {
+    public void bind(String address, int port, String data) {
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+        try {
+            Bootstrap clientBootstrap = new Bootstrap();
+            clientBootstrap = clientBootstrap.group(eventLoopGroup);
+            clientBootstrap = clientBootstrap.channel(NioDatagramChannel.class);
+            clientBootstrap = clientBootstrap.option(ChannelOption.SO_BROADCAST, true);
+            clientBootstrap = clientBootstrap.handler(new NettyUdpClientHandler());
+            Channel channel = clientBootstrap.bind(0).sync().channel();
+            channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8), new InetSocketAddress(address, port))).sync();
+            log.info("channel_id = {} ", channel.id().toString());
+
+            //  方式一:查询等待超时 单位s  等待服务端原路返回的消息,
+            //  在channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)方法中收到消息后可主动关闭channel,此处等待自然释放
+            channel.closeFuture().await(10000);
+
+            //	方式二:直接等待服务端原路返回后在
+            //	channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
+            //	方法中收到消息后可主动关闭channe
+            //  若服务端没有原路返回消息或者消息未收到将会一直等待,浪费资源
+            //  channel.closeFuture().sync();
+
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        } finally {
+            log.info("netty client udp close!");
+            eventLoopGroup.shutdownGracefully();
+        }
+    }
+
+
+    //    测试
+    public static void main(String[] args) {
+
+        //  向网段内的所有机器广播UDP消息,这个没试过是不是这个原理
+        // new BootNettyUdpClient().bind("255.255.255.255",9999,"I am client");
+
+        // 指定某个套接字地址和发送的内容可以发送消息
+        // 该方式也可以封装成一个udp的客户端的send类
+        new NettyUdpClient().bind("127.0.0.1", 9999, "I am client");
+
+
+    }
+}

+ 5 - 5
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyClientInitialize.java

@@ -1,6 +1,6 @@
 package com.phm.netty.client.handler;
 
-import com.phm.netty.config.NettyProperties;
+import com.phm.netty.config.NettyTcpProperties;
 import com.phm.netty.codec.message.MessageDecodeHandler;
 import com.phm.netty.codec.message.MessageEncodeHandler;
 import io.netty.buffer.ByteBuf;
@@ -19,16 +19,16 @@ import org.springframework.stereotype.Component;
 public class NettyClientInitialize extends ChannelInitializer<SocketChannel> {
 
     final
-    NettyProperties nettyProperties;
+    NettyTcpProperties nettyTcpProperties;
 
-    public NettyClientInitialize(NettyProperties nettyProperties) {
-        this.nettyProperties = nettyProperties;
+    public NettyClientInitialize(NettyTcpProperties nettyTcpProperties) {
+        this.nettyTcpProperties = nettyTcpProperties;
     }
 
     @Override
     protected void initChannel(SocketChannel socketChannel) throws Exception {
         // 数据分割符
-        String delimiterStr = nettyProperties.getSeparator();
+        String delimiterStr = nettyTcpProperties.getSeparator();
         ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
         ChannelPipeline pipeline = socketChannel.pipeline();
         // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节

+ 58 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyUdpClientHandler.java

@@ -0,0 +1,58 @@
+package com.phm.netty.client.handler;
+
+import com.phm.netty.domain.Message;
+import com.phm.netty.enums.OrderEnum;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Description NettyUdpClientHandler
+ * @Author WGK
+ * @Date 2023/10/30 15:30
+ */
+@Slf4j
+public class NettyUdpClientHandler extends SimpleChannelInboundHandler<byte[]> {
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, byte[] binaryData) throws Exception {
+        try {
+            log.info("channelId---:{}", ctx.channel().id().toString());
+            Message msg = Message.getMessage(binaryData);
+            if (msg == null) {
+                return;
+            }
+            short type = msg.getType();
+            // 打印收到的消息
+            log.info("msg---:{}", msg);
+            if (OrderEnum.ORDER_CONFIG.getType() != type) {
+                // 数据
+                log.info("");
+            }
+            ctx.close();
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * 重写方法
+     * 结构:
+     * 1.public class BootNettyUdpClientSimpleChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket>
+     * 2.public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
+     * 3.public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
+     * ChannelInboundHandlerAdapter类有诸多方法可以重写,可以根据具体需求来写
+     */
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        super.channelReadComplete(ctx);
+        log.info("关闭channel,channelId---:{}", ctx.channel().id().toString());
+        ctx.close();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 7 - 7
PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyConfig.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyTcpConfig.java

@@ -1,6 +1,6 @@
 package com.phm.netty.config;
 
-import com.phm.netty.server.NettyServerInitialize;
+import com.phm.netty.server.NettyTcpServerInitialize;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -17,10 +17,10 @@ import org.springframework.context.annotation.Configuration;
  */
 @Configuration
 @EnableConfigurationProperties
-public class NettyConfig {
+public class NettyTcpConfig {
 
     @Autowired
-    NettyProperties nettyProperties;
+    NettyTcpProperties nettyTcpProperties;
     /**
      * boss线程池-进行客户端连接
      *
@@ -28,7 +28,7 @@ public class NettyConfig {
      */
     @Bean
     public NioEventLoopGroup boosGroup() {
-        return new NioEventLoopGroup(nettyProperties.getBoss());
+        return new NioEventLoopGroup(nettyTcpProperties.getBoss());
     }
 
     /**
@@ -38,7 +38,7 @@ public class NettyConfig {
      */
     @Bean
     public NioEventLoopGroup workerGroup() {
-        return new NioEventLoopGroup(nettyProperties.getWorker());
+        return new NioEventLoopGroup(nettyTcpProperties.getWorker());
     }
 
     /**
@@ -54,8 +54,8 @@ public class NettyConfig {
                 // 指定使用的通道
                 .channel(NioServerSocketChannel.class)
                 // 指定连接超时时间
-                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyTcpProperties.getTimeout())
                 // 指定worker处理器
-                .childHandler(new NettyServerInitialize());
+                .childHandler(new NettyTcpServerInitialize());
     }
 }

+ 2 - 2
PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyProperties.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyTcpProperties.java

@@ -15,8 +15,8 @@ import org.springframework.stereotype.Component;
 @Component
 @Data
 @ConfigurationProperties(prefix = "netty-init")
-@PropertySource(value = { "classpath:application-netty.yml" })
-public class NettyProperties {
+@PropertySource(value = {"classpath:application-tcp-netty.yml"})
+public class NettyTcpProperties {
 
     /**
      * boss线程数量

+ 34 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/controller/NettyUdpClientController.java

@@ -0,0 +1,34 @@
+package com.phm.netty.controller;
+
+import com.phm.common.core.controller.BaseController;
+import com.phm.common.core.page.TableDataInfo;
+import com.phm.manage.domain.common.CommonResult;
+import com.phm.netty.client.NettyUdpClient;
+import com.phm.system.domain.SysConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * @Description NettyUdpClientController
+ * @Author WGK
+ * @Date 2023/10/30 15:46
+ */
+@RestController
+@RequestMapping("/udp")
+public class NettyUdpClientController extends BaseController {
+    @Autowired
+    private NettyUdpClient udpClient;
+
+    @PostMapping("/getInfo")
+    public CommonResult<String> getInfo(SysConfig config) {
+        // udp客户端,向服务端发送获取数据请求服务
+        udpClient.bind("", 0, null);
+        return CommonResult.buildSuccess();
+    }
+}

+ 23 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java

@@ -117,11 +117,13 @@ public class Message implements Serializable {
         message.setType(OrderEnum.RESPONSE.getType()).setTarget("GPHM").setSource("SPHM").setData("{\"id\": \"ZL001\",\"response\":200}");
         return Message.msgToBytes(message);
     }
+
     public static byte[] testMsg3() {
         Message message = new Message();
         message.setType(OrderEnum.TIMING.getType()).setTarget("GPHM").setSource("SPHM").setData("{\"id\": \"ZL001\",\"response\":200}");
         return Message.msgToBytes(message);
     }
+
     public static byte[] testMsg4() {
         Message message = new Message();
         message.setType(OrderEnum.ORDER_CONFIG.getType()).setTarget("GPHM").setSource("SPHM").setData("{\"cmdId\": \"getConfig\"}");
@@ -160,6 +162,25 @@ public class Message implements Serializable {
         len = str.length();
     }
 
+    public static Message getMessage(byte[] binaryData) {
+        // 处理接收到的二进制消息
+        int len = 0;
+        if (binaryData.length >= 4) {
+            // 读取的字节长度
+            len = BitUtils.toInt(binaryData, 0);
+        }
+        if (len <= 0) {
+            return null;
+        }
+        // 创建一个新的字节数组来存储截取的部分
+        byte[] subArray = new byte[len];
+        // 方法将源字节数组的一部分复制到新数组中
+        System.arraycopy(binaryData, 4, subArray, 0, len);
+        Message msg = new Message();
+        Message.bytesToMsg(msg, subArray, len);
+        return msg;
+    }
+
     public String toJsonString() {
         return "{" +
                 "\n  \"len\": " + len + ",\n" +
@@ -175,10 +196,12 @@ public class Message implements Serializable {
         ObjectMapper objectMapper = new ObjectMapper();
         return objectMapper.writeValueAsString(CommonResponse.buildSuccess(code));
     }
+
     public static String successDate(String code) throws JsonProcessingException {
         ObjectMapper objectMapper = new ObjectMapper();
         return objectMapper.writeValueAsString(CommonResponse.buildSuccess(code));
     }
+
     public static String success() throws JsonProcessingException {
         ObjectMapper objectMapper = new ObjectMapper();
         return objectMapper.writeValueAsString(CommonResponse.success());

+ 8 - 8
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerBoot.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyTcpServerBoot.java

@@ -1,6 +1,6 @@
 package com.phm.netty.server;
 
-import com.phm.netty.config.NettyProperties;
+import com.phm.netty.config.NettyTcpProperties;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.nio.NioEventLoopGroup;
 import lombok.extern.slf4j.Slf4j;
@@ -18,7 +18,7 @@ import javax.annotation.Resource;
  */
 @Slf4j
 @Component
-public class NettyServerBoot {
+public class NettyTcpServerBoot {
     @Resource
     NioEventLoopGroup boosGroup;
     @Resource
@@ -26,11 +26,11 @@ public class NettyServerBoot {
     final ServerBootstrap serverBootstrap;
 
     final
-    NettyProperties nettyProperties;
+    NettyTcpProperties nettyTcpProperties;
 
-    public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {
+    public NettyTcpServerBoot(ServerBootstrap serverBootstrap, NettyTcpProperties nettyTcpProperties) {
         this.serverBootstrap = serverBootstrap;
-        this.nettyProperties = nettyProperties;
+        this.nettyTcpProperties = nettyTcpProperties;
     }
 
     /**
@@ -42,11 +42,11 @@ public class NettyServerBoot {
     public void start() throws InterruptedException {
 
         // 绑定监听端口启动
-        serverBootstrap.bind(nettyProperties.getPort()).sync();
+        serverBootstrap.bind(nettyTcpProperties.getPort()).sync();
 
         // 备用端口
-        serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
-        log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());
+        serverBootstrap.bind(nettyTcpProperties.getPortSalve()).sync();
+        log.info("启动Netty: {},{}", nettyTcpProperties.getPort(), nettyTcpProperties.getPortSalve());
     }
 
     /**

+ 1 - 1
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerInitialize.java → PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyTcpServerInitialize.java

@@ -16,7 +16,7 @@ import org.springframework.stereotype.Component;
  * @Date 2023/9/13 13:44
  */
 @Component
-public class NettyServerInitialize extends ChannelInitializer<SocketChannel> {
+public class NettyTcpServerInitialize extends ChannelInitializer<SocketChannel> {
 
     @Override
     protected void initChannel(SocketChannel socketChannel) throws Exception {

+ 3 - 16
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ByteArrayMessageHandler.java

@@ -11,10 +11,8 @@ import com.phm.netty.constant.MsgConstant;
 import com.phm.netty.domain.Message;
 import com.phm.netty.enums.OrderEnum;
 import com.phm.netty.service.IProcessService;
-import com.phm.netty.utils.BitUtils;
 import com.phm.netty.utils.ChannelMap;
 import com.phm.netty.utils.JsonUtils;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -70,22 +68,10 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
      */
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, byte[] binaryData) throws Exception {
-        // 处理接收到的二进制消息
-        int len = 0;
-        if (binaryData.length >= 4) {
-            // 读取的字节长度
-            len = BitUtils.toInt(binaryData, 0);
-        }
-        if (len <= 0) {
-            log.error("解析len属性,条件不满足:{}", len);
+        Message msg = Message.getMessage(binaryData);
+        if (msg == null) {
             return;
         }
-        // 创建一个新的字节数组来存储截取的部分
-        byte[] subArray = new byte[len];
-        // 方法将源字节数组的一部分复制到新数组中
-        System.arraycopy(binaryData, 4, subArray, 0, len);
-        Message msg = new Message();
-        Message.bytesToMsg(msg, subArray, len);
         short type = msg.getType();
         Message lastMsg = null;
         if (OrderEnum.COMMON.getType() == type && saveChannel(ctx, msg)) {
@@ -123,6 +109,7 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
 
     /**
      * 保存消息通道 信息
+     *
      * @param ctx ctx
      * @param msg msg
      * @return res

+ 9 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IProcessService.java

@@ -37,4 +37,13 @@ public interface IProcessService {
      * @throws Exception exception
      */
     List<OrderConfig> getParameterConfig(Message message) throws Exception;
+
+    /**
+     * 解析Data
+     *
+     * @param message message
+     * @return res
+     * @throws Exception ex
+     */
+    List<String> proData(Message message) throws Exception;
 }

+ 12 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/ProcessService.java

@@ -1,5 +1,6 @@
 package com.phm.netty.service.impl;
 
+import cn.hutool.json.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.phm.manage.domain.OrderConfig;
 import com.phm.manage.domain.OrderInfo;
@@ -90,4 +91,15 @@ public class ProcessService implements IProcessService {
     public List<OrderConfig> getParameterConfig(Message message) throws Exception {
         return orderConfigService.selectOrderConfigList(null);
     }
+
+    @Override
+    public List<String> proData(Message message) throws Exception {
+        JSONObject jsonObject = JSONObject.parseObject(message.getData());
+        String cmdId = jsonObject.getString("cmdid");
+        String params = jsonObject.getString("params");
+        JSONArray jsonArray = new JSONArray(params);
+        // 解析数据集合,保存 TODO
+
+        return null;
+    }
 }

+ 0 - 0
PHM-admin/phm-netty/src/main/resources/application-netty.yml → PHM-admin/phm-netty/src/main/resources/application-tcp-netty.yml


+ 23 - 0
PHM-admin/phm-netty/src/main/resources/application-udp-netty.yml

@@ -0,0 +1,23 @@
+# netty 配置
+netty-init:
+  # boss线程数量
+  boss: 4
+  # worker线程数量
+  worker: 2
+  # 连接超时时间
+  timeout: 1000
+  # 服务器主端口
+  port: 18000
+  # 服务器备用端口
+  portSalve: 18001
+  # 解码最大长度
+  de-code-size: 2048
+  # 分割符号
+  separator: $@
+  # 服务器地址
+  host: 127.0.0.1
+  #远程端口
+  remote-port: 18000
+  #远程服务器地址
+  remote-host: 127.0.0.1
+