Browse Source

TCP协议, 客户端,服务端实现

wanggaokun 1 year ago
parent
commit
9317433c33
19 changed files with 828 additions and 1 deletions
  1. 0 1
      PHM-admin/phm-admin/src/main/java/com/phm/PHMApplication.java
  2. 53 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyClient.java
  3. 62 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/ClientListenerHandler.java
  4. 30 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyClientInitialize.java
  5. 61 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyConfig.java
  6. 80 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyProperties.java
  7. 77 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java
  8. 55 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/enums/MessageEnum.java
  9. 61 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerBoot.java
  10. 34 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerInitialize.java
  11. 117 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ServerListenerHandler.java
  12. 12 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IPushMsgService.java
  13. 14 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/ISendMsgService.java
  14. 32 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/PushMsgService.java
  15. 34 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/SendMsgService.java
  16. 36 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/ChannelMap.java
  17. 26 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageDecodeHandler.java
  18. 21 0
      PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageEncodeHandler.java
  19. 23 0
      PHM-admin/phm-netty/src/main/resources/application-netty.yml

+ 0 - 1
PHM-admin/phm-admin/src/main/java/com/phm/PHMApplication.java

@@ -3,7 +3,6 @@ package com.phm;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.context.annotation.ComponentScan;
 
 /**
  * 启动程序

+ 53 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/NettyClient.java

@@ -0,0 +1,53 @@
+package com.phm.netty.client;
+
+import com.phm.netty.client.handler.NettyClientInitialize;
+import com.phm.netty.config.NettyProperties;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description NettyClient
+ * @Author WGK
+ * @Date 2023/9/15 11:30
+ */
+@Component
+@Slf4j
+public class NettyClient {
+    final
+    NettyProperties nettyProperties;
+
+    public NettyClient(NettyProperties nettyProperties) {
+        this.nettyProperties = nettyProperties;
+    }
+
+    /**
+     * 启动netty服务端
+     */
+    public void start() {
+        connect(nettyProperties.getRemoteHost(), nettyProperties.getRemotePort());
+    }
+
+    public void connect(final String host, final int port) {
+        NioEventLoopGroup group = new NioEventLoopGroup();
+        Bootstrap bootstrap = new Bootstrap()
+                .group(group).channel(NioSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .handler(new NettyClientInitialize());
+        try {
+            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
+            channelFuture.channel().closeFuture().sync();
+        } catch (InterruptedException exception) {
+            log.error("启动客户端异常!!");
+            log.error(exception.getMessage());
+        } finally {
+            // 关闭连接
+            group.shutdownGracefully();
+        }
+        log.info("启动Netty客户端: {}", nettyProperties.getPort());
+    }
+}

+ 62 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/ClientListenerHandler.java

@@ -0,0 +1,62 @@
+package com.phm.netty.client.handler;
+
+import com.phm.netty.domain.Message;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Description ClientListenerHandler 客户端业务监听
+ * @Author WGK
+ * @Date 2023/9/15 9:22
+ */
+@Slf4j
+public class ClientListenerHandler extends SimpleChannelInboundHandler<Message> {
+
+    /**
+     * 连接服务端
+     *
+     * @param context 上下文
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext context) {
+        log.info("{}连上了服务器", context.channel().remoteAddress());
+    }
+
+    /**
+     * 断开服务器
+     *
+     * @param context 上下文
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext context) {
+        log.info("{}断开了服务器", context.channel().remoteAddress());
+        context.fireChannelInactive();
+    }
+
+    /**
+     * 接收到客户端数据,业务处理
+     *
+     * @param context 上下文
+     * @param message 消息信息
+     * @throws Exception 异常
+     */
+    @Override
+    protected void channelRead0(ChannelHandlerContext context, Message message) throws Exception {
+        log.info("收到服务端的消息:{}", message.getContent());
+        context.channel().close();
+    }
+
+    /**
+     * 异常发生时候调用
+     *
+     * @param context 上下文
+     * @param cause 异常
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+        log.error("{}连接发生异常", context.channel().remoteAddress());
+        log.error(cause.getCause().toString());
+        context.close();
+    }
+}

+ 30 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/client/handler/NettyClientInitialize.java

@@ -0,0 +1,30 @@
+package com.phm.netty.client.handler;
+
+import com.phm.netty.utils.MessageDecodeHandler;
+import com.phm.netty.utils.MessageEncodeHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description NettyClientInitialize 客户端初始类
+ * @Author WGK
+ * @Date 2023/9/15 9:21
+ */
+@Component
+public class NettyClientInitialize extends ChannelInitializer<SocketChannel> {
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        // 数据分割符
+//        String delimiterStr = nettyProperties.getSeparator();
+//        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
+        ChannelPipeline pipeline = socketChannel.pipeline();
+        // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
+//        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
+        pipeline.addLast(new MessageEncodeHandler());
+        pipeline.addLast(new MessageDecodeHandler());
+        pipeline.addLast(new ClientListenerHandler());
+    }
+}

+ 61 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyConfig.java

@@ -0,0 +1,61 @@
+package com.phm.netty.config;
+
+import com.phm.netty.server.handler.NettyServerInitialize;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Description NettyConfig 配置
+ * @Author WGK
+ * @Date 2023/9/14 11:37
+ */
+@Configuration
+@EnableConfigurationProperties
+public class NettyConfig {
+
+    @Autowired
+    NettyProperties nettyProperties;
+    /**
+     * boss线程池-进行客户端连接
+     *
+     * @return NioEventLoopGroup
+     */
+    @Bean
+    public NioEventLoopGroup boosGroup() {
+        return new NioEventLoopGroup(nettyProperties.getBoss());
+    }
+
+    /**
+     * worker线程池-进行业务处理
+     *
+     * @return NioEventLoopGroup
+     */
+    @Bean
+    public NioEventLoopGroup workerGroup() {
+        return new NioEventLoopGroup(nettyProperties.getWorker());
+    }
+
+    /**
+     * 服务端启动器,监听客户端连接
+     *
+     * @return ServerBootstrap
+     */
+    @Bean
+    public ServerBootstrap serverBootstrap() {
+        return new ServerBootstrap()
+                // 指定使用的线程组
+                .group(boosGroup(), workerGroup())
+                // 指定使用的通道
+                .channel(NioServerSocketChannel.class)
+                // 指定连接超时时间
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
+                // 指定worker处理器
+                .childHandler(new NettyServerInitialize());
+    }
+}

+ 80 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/config/NettyProperties.java

@@ -0,0 +1,80 @@
+package com.phm.netty.config;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description NettyProperties 配置项
+ *
+ * @Author WGK
+ * @Date 2023/9/14 11:31
+ */
+@Component
+@Data
+@ConfigurationProperties(prefix = "netty-init")
+@PropertySource(value = { "classpath:application-netty.yml" })
+public class NettyProperties {
+
+    /**
+     * boss线程数量
+     */
+    @Value("${boss}")
+    private Integer boss;
+
+    /**
+     * worker线程数量
+     */
+    @Value("${worker}")
+    private Integer worker;
+
+    /**
+     * 连接超时时间
+     */
+    @Value("${timeout}")
+    private Integer timeout;
+
+    /**
+     * 服务器主端口
+     */
+    @Value("${port}")
+    private Integer port;
+
+    /**
+     * 服务器备用端口
+     */
+    @Value("${portSalve}")
+    private Integer portSalve;
+
+    /**
+     * 服务器备用端口
+     */
+    @Value("${de-code-size}")
+    private Integer deCodeSize;
+
+    /**
+     * 服务器备用端口
+     */
+    @Value("${separator}")
+    private String separator;
+
+    /**
+     * 服务器地址
+     */
+    @Value("${host}")
+    private String host;
+
+    /**
+     * 远程端口
+     */
+    @Value("${remote-port}")
+    private Integer remotePort;
+
+    /**
+     * 远程服务器地址
+     */
+    @Value("${remote-host}")
+    private String remoteHost;
+}

+ 77 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java

@@ -0,0 +1,77 @@
+package com.phm.netty.domain;
+
+import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson2.JSONObject;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * @Description Message
+ * @Author WGK
+ * @Date 2023/9/14 11:41
+ */
+@Setter
+@Getter
+public class Message {
+    /**
+     * 客户端ID
+     */
+    private String id;
+    /**
+     * 数据长度
+     */
+    private Integer len;
+
+    /**
+     * 接收的通讯数据body
+     */
+    private String content;
+
+    /**
+     * 消息类型
+     */
+    private Integer type;
+
+    /**
+     * 毫秒时间戳,从2020.1.1开始
+     */
+    private Long timestamp = DateUtil.current(false) - DateUtil.parse("2020-01-01").getTime();
+
+    /**
+     * 数据源
+     */
+    private String source = "GPHM";
+
+    /**
+     * 目标
+     */
+    private String target = "SPHNM";
+
+    /**
+     * 目标
+     */
+    private Integer reserve = 0;
+
+    public Message() {
+    }
+
+    public Message(Object object) {
+        String str = object.toString();
+        JSONObject jsonObject = JSONObject.parseObject(str);
+        type = Integer.valueOf(jsonObject.getString("type"));
+        content = jsonObject.getString("content");
+        id = jsonObject.getString("id");
+        len = str.length();
+    }
+
+    public String toJsonString() {
+        return "{" +
+                "\n  \"len\": " + len + ",\n" +
+                "  \"type\": " + type + ",\n" +
+                "  \"source\": " + source + ",\n" +
+                "  \"target\": " + target + ",\n" +
+                "  \"timestamp\": " + timestamp + ",\n" +
+                "  \"content\": " + content + "\n" +
+                "}";
+    }
+}

+ 55 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/enums/MessageEnum.java

@@ -0,0 +1,55 @@
+package com.phm.netty.enums;
+
+import com.phm.netty.domain.Message;
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * @Description MessageEnum 数据类型枚举类
+ * @Author WGK
+ * @Date 2023/9/14 11:47
+ */
+@Getter
+public enum MessageEnum {
+
+    GET_DATA_BY_ONBOARD(1, "机载数据获取指令"),
+    GET_DATA_BY_CHAIN(2, "数据链与数据仿真数据获取指令"),
+    FALSE_ALARM(3, "机载故障虚警抑制指令"),
+    ISOLATE(4, "机载故障深度隔离指令"),
+    GROUND_FAULT(5, "地面故障诊断指令"),
+    FAILURE_PREDICTION(6, "故障预测指令"),
+    // 以下作为客户端按发送
+    HEARTBEAT(7, "心跳机制"),
+    NTP(8, "校时机制");
+
+    public final Integer type;
+    public final String content;
+
+    MessageEnum(Integer type, String content) {
+        this.type = type;
+        this.content = content;
+    }
+
+    // case中判断使用
+    public static MessageEnum getStructureEnum(Message msg) {
+        Integer type = Optional.ofNullable(msg)
+                .map(Message::getType)
+                .orElse(0);
+        if (type == 0) {
+            return null;
+        } else {
+            List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())
+                    .filter((item) -> item.getType().equals(type))
+                    .distinct()
+                    .collect(Collectors.toList());
+            if (!objectEnums.isEmpty()) {
+                return objectEnums.get(0);
+            }
+            return null;
+        }
+    }
+}

+ 61 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/NettyServerBoot.java

@@ -0,0 +1,61 @@
+package com.phm.netty.server;
+
+import com.phm.netty.config.NettyProperties;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.nio.NioEventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+/**
+ * @Description NettyServerBoot 服务端启动类
+ *
+ * @Author WGK
+ * @Date 2023/9/14 11:50
+ */
+@Slf4j
+@Component
+public class NettyServerBoot {
+    @Resource
+    NioEventLoopGroup boosGroup;
+    @Resource
+    NioEventLoopGroup workerGroup;
+    final ServerBootstrap serverBootstrap;
+
+    final
+    NettyProperties nettyProperties;
+
+    public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {
+        this.serverBootstrap = serverBootstrap;
+        this.nettyProperties = nettyProperties;
+    }
+
+    /**
+     * 启动netty服务端
+     *
+     * @throws InterruptedException 异常
+     */
+    @PostConstruct
+    public void start() throws InterruptedException {
+
+        // 绑定监听端口启动
+        serverBootstrap.bind(nettyProperties.getPort()).sync();
+
+        // 备用端口
+        serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
+        log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());
+    }
+
+    /**
+     * 关闭netty
+     */
+    @PreDestroy
+    public void close() {
+        log.info("关闭Netty");
+        boosGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+}

+ 34 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/NettyServerInitialize.java

@@ -0,0 +1,34 @@
+package com.phm.netty.server.handler;
+
+import com.phm.netty.utils.MessageDecodeHandler;
+import com.phm.netty.utils.MessageEncodeHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description NettyServerHandler 服务端初始类
+ * @Author WGK
+ * @Date 2023/9/13 13:44
+ */
+@Component
+public class NettyServerInitialize extends ChannelInitializer<SocketChannel> {
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        // 数据分割符
+//        String delimiterStr = nettyProperties.getSeparator();
+//        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
+        ChannelPipeline pipeline = socketChannel.pipeline();
+        // 使用自定义处理拆包/沾包,每次查找的最大长度 防止恶意数据
+//        pipeline.addLast(new DelimiterBasedFrameDecoder(151, delimiter));
+        // 将上一步解码后的数据转码为Message实例
+        pipeline.addLast(new MessageDecodeHandler());
+        // 对发送客户端的数据进行编码,并添加数据分隔符
+        pipeline.addLast(new MessageEncodeHandler());
+        // 对数据进行最终处理
+        pipeline.addLast(new ServerListenerHandler());
+    }
+}
+

+ 117 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ServerListenerHandler.java

@@ -0,0 +1,117 @@
+package com.phm.netty.server.handler;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.phm.netty.domain.Message;
+import com.phm.netty.enums.MessageEnum;
+import com.phm.netty.utils.ChannelMap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+/**
+ * @Description ServerListenerHandler 数据处理器,针对不同类型数据分类处理
+ * @Author WGK
+ * @Date 2023/9/14 11:45
+ */
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {
+
+    /**
+     * 设备接入连接时处理
+     *
+     * @param ctx ctx
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
+        Message msg = new Message();
+        msg.setContent("和服务端连接成功");
+        msg.setTarget("Client");
+        msg.setSource("Serve");
+        ctx.channel().writeAndFlush(msg);
+    }
+
+    /**
+     * 数据处理
+     *
+     * @param ctx ctx
+     * @param msg msg
+     */
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
+        // 获取消息实例中的消息体
+        String content = msg.getContent();
+        msg.setId(ctx.channel().id().asLongText());
+        // 对不同消息类型进行处理
+        MessageEnum type = MessageEnum.getStructureEnum(msg);
+        switch (Objects.requireNonNull(type)) {
+            case GET_DATA_BY_ONBOARD:
+                // 将通道加入ChannelMap
+                ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
+                // 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+                AttributeKey<String> key = AttributeKey.valueOf("id");
+                ctx.channel().attr(key).setIfAbsent(msg.getId());
+                this.channelWrite(ctx.channel(), msg);
+                // TODO 业务处理
+            case FALSE_ALARM:
+                // TODO 业务处理
+            default:
+                System.out.println(type.content + "消息内容: " + content);
+        }
+    }
+
+    /**
+     * 设备下线处理
+     *
+     * @param ctx ctx
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        log.info("设备下线了:{}", ctx.channel().id().asLongText());
+        // map中移除channel
+        removeId(ctx);
+    }
+
+    /**
+     * 设备连接异常处理
+     *
+     * @param ctx   ctx
+     * @param cause cause
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // 打印异常
+        log.info("异常:{}", cause.getMessage());
+        // map中移除channel
+        removeId(ctx);
+        // 关闭连接
+        ctx.close();
+    }
+
+    private void removeId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("id");
+        // 获取channel中id
+        String id = ctx.channel().attr(key).get();
+        // map移除channel
+        if (CollectionUtil.isNotEmpty(ChannelMap.getChannelMap())) {
+            ChannelMap.getChannelMap().remove(id);
+        } else {
+            log.info("ChannelMap === 为空");
+        }
+    }
+
+    public void channelWrite(Channel channel, Message message) throws Exception {
+        if (null == channel) {
+            throw new RuntimeException("客户端已离线");
+        }
+        channel.writeAndFlush(message);
+    }
+}

+ 12 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/IPushMsgService.java

@@ -0,0 +1,12 @@
+package com.phm.netty.service;
+
+import com.phm.netty.domain.Message;
+
+/**
+ * @Description PushMsgService
+ * @Author WGK
+ * @Date 2023/9/14 14:20
+ */
+public interface IPushMsgService {
+    void push(Message message);
+}

+ 14 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/ISendMsgService.java

@@ -0,0 +1,14 @@
+package com.phm.netty.service;
+
+import com.phm.netty.domain.Message;
+
+/**
+ * @Description ISendMsgService
+ * @Author WGK
+ * @Date 2023/9/15 9:05
+ */
+public interface ISendMsgService {
+    void send(Message message);
+
+    void start();
+}

+ 32 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/PushMsgService.java

@@ -0,0 +1,32 @@
+package com.phm.netty.service.impl;
+
+import com.phm.netty.domain.Message;
+import com.phm.netty.utils.ChannelMap;
+import com.phm.netty.service.IPushMsgService;
+import io.netty.channel.Channel;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Description PushMsgService
+ * @Author WGK
+ * @Date 2023/9/14 14:21
+ */
+@Service
+public class PushMsgService implements IPushMsgService {
+
+    /**
+     * 向客户端发送请求
+     *
+     * @param message 消息
+     */
+    @Override
+    public void push(Message message) {
+        // 客户端ID
+        String id = message.getId();
+        Channel channel = ChannelMap.getChannel(id);
+        if (null == channel) {
+            throw new RuntimeException("客户端已离线");
+        }
+        channel.writeAndFlush(message.toJsonString());
+    }
+}

+ 34 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/SendMsgService.java

@@ -0,0 +1,34 @@
+package com.phm.netty.service.impl;
+
+import com.phm.netty.client.NettyClient;
+import com.phm.netty.domain.Message;
+import com.phm.netty.service.ISendMsgService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Description SendMsgService 客户端发送给客户端消息
+ * @Author WGK
+ * @Date 2023/9/15 9:06
+ */
+@Service
+public class SendMsgService implements ISendMsgService {
+
+    @Autowired
+    private NettyClient nettyClient;
+
+    /**
+     * 向服务端发送请求
+     *
+     * @param message msg
+     */
+    @Override
+    public void send(Message message) {
+
+    }
+
+    @Override
+    public void start() {
+        nettyClient.start();
+    }
+}

+ 36 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/ChannelMap.java

@@ -0,0 +1,36 @@
+package com.phm.netty.utils;
+
+import io.netty.channel.Channel;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Description ChannelMap 连接通道信息保存
+ *
+ * @Author WGK
+ * @Date 2023/9/14 14:18
+ */
+public class ChannelMap {
+    /**
+     * 存放客户端标识ID(消息ID)与channel的对应关系
+     */
+    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
+
+    private ChannelMap() {
+    }
+
+    public static ConcurrentHashMap<String, Channel> getChannelMap() {
+        if (null == channelMap) {
+            synchronized (ChannelMap.class) {
+                if (null == channelMap) {
+                    channelMap = new ConcurrentHashMap<>();
+                }
+            }
+        }
+        return channelMap;
+    }
+
+    public static Channel getChannel(String id) {
+        return getChannelMap().get(id);
+    }
+}

+ 26 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageDecodeHandler.java

@@ -0,0 +1,26 @@
+package com.phm.netty.utils;
+
+import com.phm.netty.domain.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.CharsetUtil;
+
+import java.util.List;
+
+/**
+ * @Description MessageDecodeHandler 解码
+ *
+ * @Author WGK
+ * @Date 2023/9/14 11:40
+ */
+public class MessageDecodeHandler extends ByteToMessageDecoder {
+    @Override
+    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+        ByteBuf frame = byteBuf.retainedDuplicate();
+        final String content = frame.toString(CharsetUtil.UTF_8);
+        Message message = new Message(content);
+        list.add(message);
+        byteBuf.skipBytes(byteBuf.readableBytes());
+    }
+}

+ 21 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/utils/MessageEncodeHandler.java

@@ -0,0 +1,21 @@
+package com.phm.netty.utils;
+
+import com.phm.netty.domain.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.CharsetUtil;
+
+/**
+ * @Description MessageEncodeHandler 编码
+ *
+ * @Author WGK
+ * @Date 2023/9/14 11:42
+ */
+public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
+
+    @Override
+    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
+        out.writeBytes((message.toJsonString()).getBytes(CharsetUtil.UTF_8));
+    }
+}

+ 23 - 0
PHM-admin/phm-netty/src/main/resources/application-netty.yml

@@ -0,0 +1,23 @@
+# netty 配置
+netty-init:
+  # boss线程数量
+  boss: 4
+  # worker线程数量
+  worker: 2
+  # 连接超时时间
+  timeout: 1000
+  # 服务器主端口
+  port: 18000
+  # 服务器备用端口
+  portSalve: 18001
+  # 解码最大长度
+  de-code-size: 2048
+  # 分割符号
+  separator: $@
+  # 服务器地址
+  host: 127.0.0.1
+  #远程端口
+  remote-port: 18000
+  #远程服务器地址
+  remote-host: 127.0.0.1
+