Jelajahi Sumber

保存消息通道

wanggaokun 1 tahun lalu
induk
melakukan
90daca1166

+ 12 - 0
PHM-admin/phm-manage/src/main/java/com/phm/manage/domain/OrderInfo.java

@@ -26,6 +26,18 @@ public class OrderInfo extends BaseEntity {
     @JsonSerialize(using = ToStringSerializer.class)
     private long id;
 
+    /**
+     * 指令批次
+     */
+    @Excel(name = "指令批次")
+    private String orderBatchId;
+
+    /**
+     * 指令编码
+     */
+    @Excel(name = "指令编码")
+    private String orderCode;
+
     /**
      * 指令名称
      */

+ 5 - 0
PHM-admin/phm-netty/src/main/java/com/phm/netty/constant/MsgConstant.java

@@ -16,4 +16,9 @@ public class MsgConstant {
      * SPHM
      */
     public static final String SPHM = "SPHM";
+
+    /**
+     * 客户端固定ID
+     */
+    public static final String CLIENT_ID = GPHM+SPHM;
 }

+ 2 - 2
PHM-admin/phm-netty/src/main/java/com/phm/netty/domain/Message.java

@@ -94,7 +94,7 @@ public class Message implements Serializable {
     }
 
     public static void main(String[] args) {
-        byte[] test1 = Message.testMsg4();
+        byte[] test1 = Message.testMsg();
 
         // 使用Arrays.toString()方法打印字节数组
         System.out.println("使用Arrays.toString()方法打印字节数组: " + Arrays.toString(test1));
@@ -108,7 +108,7 @@ public class Message implements Serializable {
 
     public static byte[] testMsg() {
         Message message = new Message();
-        message.setType(OrderEnum.COMMON.getType()).setTarget("GPHM").setSource("SPHM").setData("{\"id\": \"ZL001\",\"sortieNo\":\"JH001\"}");
+        message.setType(OrderEnum.COMMON.getType()).setTarget("GPHM").setSource("SPHM").setData("{\"id\": \"ZL001\",\"orderCode\":\"ZL001\"}");
         return Message.msgToBytes(message);
     }
 

+ 41 - 8
PHM-admin/phm-netty/src/main/java/com/phm/netty/server/handler/ByteArrayMessageHandler.java

@@ -3,6 +3,8 @@ package com.phm.netty.server.handler;
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.fastjson2.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.phm.manage.domain.OrderConfig;
 import com.phm.manage.domain.common.CommonResult;
 import com.phm.netty.constant.MsgConstant;
@@ -18,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -50,7 +53,8 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
      */
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) {
-        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
+        String idStr = ctx.channel().id().asLongText();
+        log.info("有新的连接:[{}]", idStr);
         log.info("校时请求");
         Message msg = new Message();
         msg.setType(OrderEnum.TIMING.getType()).setData("{\"cmdId\": \"timing\"}");
@@ -60,7 +64,7 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
     /**
      * 客户端消息处理
      *
-     * @param ctx ctx
+     * @param ctx        ctx
      * @param binaryData binaryData
      * @throws Exception Exception
      */
@@ -84,7 +88,7 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
         Message.bytesToMsg(msg, subArray, len);
         short type = msg.getType();
         Message lastMsg = null;
-        if (OrderEnum.COMMON.getType() == type) {
+        if (OrderEnum.COMMON.getType() == type && saveChannel(ctx, msg)) {
             // 通用指令
             lastMsg = byteArrayMessageHandler.processService.orderHandle(msg);
         }
@@ -96,16 +100,16 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
             // 参数传递
             List<OrderConfig> configList = byteArrayMessageHandler.processService.getParameterConfig(msg);
             // 分批处理
-            List<List<OrderConfig>> batches  = CollUtil.split(configList, 10);
+            List<List<OrderConfig>> batches = CollUtil.split(configList, 10);
             Message msgParameter = new Message();
             for (List<OrderConfig> batch : batches) {
                 msgParameter.setType(OrderEnum.ORDER_CONFIG.getType()).setData(JsonUtils.convertJson(CommonResult.success(batch)));
-                log.info("msgParameter==={}",msgParameter);
+                log.info("msgParameter==={}", msgParameter);
                 ctx.writeAndFlush(Message.msgToBytes(msgParameter));
             }
             // 结束传输标识
             msgParameter.setType(OrderEnum.ORDER_CONFIG.getType()).setData(JsonUtils.convertJson(CommonResult.complete("{\"cmdId\":\"complete\"}")));
-            log.info("msgParameter end ==={}",msgParameter);
+            log.info("msgParameter end ==={}", msgParameter);
             ctx.writeAndFlush(Message.msgToBytes(msgParameter));
             return;
         }
@@ -117,6 +121,36 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
         ctx.writeAndFlush(Message.msgToBytes(lastMsg));
     }
 
+    /**
+     * 保存消息通道 信息
+     * @param ctx ctx
+     * @param msg msg
+     * @return res
+     * @throws JsonProcessingException ex
+     */
+    private boolean saveChannel(ChannelHandlerContext ctx, Message msg) throws JsonProcessingException {
+        Message lastMsg = new Message();
+        JSONObject jsonObject = JSONObject.parseObject(msg.getData());
+        String code = jsonObject.getString("id");
+        String orderCode = jsonObject.getString("orderCode");
+        if (StringUtils.isBlank(code) || StringUtils.isBlank(orderCode)) {
+            lastMsg.setType(OrderEnum.RESPONSE.getType()).setData(JsonUtils.convertJson(CommonResult.error("id and orderCode cannot be empty")));
+            log.info("saveChannel lastMsg==={}", lastMsg);
+            ctx.writeAndFlush(Message.msgToBytes(lastMsg));
+            return false;
+        }
+        String idStr = MsgConstant.CLIENT_ID;
+        if (ObjectUtil.isNotEmpty(ChannelMap.getChannel(idStr))) {
+            log.info("已有消息通道");
+            return true;
+        }
+        ChannelMap.getChannelMap().put(idStr, ctx.channel());
+        // 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+        AttributeKey<String> key = AttributeKey.valueOf("id");
+        ctx.channel().attr(key).setIfAbsent(idStr);
+        return true;
+    }
+
     /**
      * 设备下线处理
      *
@@ -151,9 +185,8 @@ public class ByteArrayMessageHandler extends SimpleChannelInboundHandler<byte[]>
         String id = ctx.channel().attr(key).get();
         // map移除channel
         if (CollectionUtil.isNotEmpty(ChannelMap.getChannelMap())) {
+            log.info("删除channel: {}", id);
             ChannelMap.getChannelMap().remove(id);
-        } else {
-            log.info("ChannelMap === 为空");
         }
     }
 

+ 9 - 6
PHM-admin/phm-netty/src/main/java/com/phm/netty/service/impl/ProcessService.java

@@ -44,16 +44,21 @@ public class ProcessService implements IProcessService {
     @Override
     public Message orderHandle(Message message) throws Exception {
         JSONObject jsonObject = JSONObject.parseObject(message.getData());
-        String code = jsonObject.getString("id");
-        OrderConfig orderConfig = orderConfigService.selectOrderConfigByCode(code);
+        String orderBatchId = jsonObject.getString("id");
+        String orderCode = jsonObject.getString("orderCode");
+        OrderConfig orderConfig = orderConfigService.selectOrderConfigByCode(orderCode);
+        // 保存要执行的指令信息
         OrderInfo orderInfo = new OrderInfo();
+        orderInfo.setOrderBatchId(orderBatchId);
+        orderInfo.setOrderCode(orderConfig.getOrderCode());
         orderInfo.setOrderName(orderConfig.getName());
         orderInfo.setOrderType(orderConfig.getType());
         // 设置待执行状态
         orderInfo.setStatus(OrderStatus.S_0.getCode());
         orderInfoService.insertOrderInfo(orderInfo);
         Message msg = new Message();
-        msg.setType(OrderEnum.RESPONSE.getType()).setData(JsonUtils.convertJson(CommonResult.success("{\"id\": " + code + "}")));
+        msg.setType(OrderEnum.RESPONSE.getType())
+                .setData(JsonUtils.convertJson(CommonResult.success("{\"id\": " + orderBatchId + ",orderCode:" + orderCode + "}")));
         return msg;
     }
 
@@ -83,8 +88,6 @@ public class ProcessService implements IProcessService {
 
     @Override
     public List<OrderConfig> getParameterConfig(Message message) throws Exception {
-        List<OrderConfig> configs = orderConfigService.selectOrderConfigList(null);
-
-        return configs;
+        return orderConfigService.selectOrderConfigList(null);
     }
 }