XDecoder.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package com.mes.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.ByteBufUtil;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.handler.codec.ByteToMessageDecoder;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.List;
  10. public class XDecoder extends ByteToMessageDecoder {
  11. public static final Logger log = LoggerFactory.getLogger(XDecoder.class);
  12. static final int PACKET_SIZE = 46; // 最短包长度
  13. static final int PACKET_MAX_SIZE = 1000; // 最长包长度
  14. // 用来临时保留没有处理过的请求报文
  15. ByteBuf tempMsg = Unpooled.buffer();
  16. /**
  17. * @param ctx
  18. * @param in 请求的数据
  19. * @param out 将粘在一起的报文拆分后的结果保留起来
  20. * @throws Exception
  21. */
  22. @Override
  23. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  24. log.info(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
  25. String content = hexStringToAscii(ByteBufUtil.hexDump(in));
  26. log.info("接收包内容:" + hexStringToAscii(ByteBufUtil.hexDump(in)));
  27. // 合并报文
  28. ByteBuf message = null;
  29. int tmpMsgSize = tempMsg.readableBytes();
  30. // 如果暂存有上一次余下的请求报文,则合并
  31. if (tmpMsgSize > 0) {
  32. message = Unpooled.buffer();
  33. message.writeBytes(tempMsg);
  34. message.writeBytes(in);
  35. log.info("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
  36. log.info("合并后包内容:" + hexStringToAscii(ByteBufUtil.hexDump(message)));
  37. } else {
  38. message = in;
  39. }
  40. // log.info(Thread.currentThread() + "收到了一次数据包,长度是:" + message.readableBytes());
  41. int size = 0;
  42. String split = "bbbbfffffARW";
  43. while (true){
  44. size = message.readableBytes();
  45. if(size >= PACKET_SIZE){
  46. String retType = "NG:";
  47. int qcsize = 0;
  48. String str = hexStringToAscii(ByteBufUtil.hexDump(message));
  49. String[] lists = str.split(split);
  50. if(lists.length == 1 && size > PACKET_MAX_SIZE){ // 大于最大包长度
  51. byte[] request = new byte[size - PACKET_MAX_SIZE];
  52. // 每次从总的消息中读取n个字节的数据
  53. message.readBytes(request);
  54. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  55. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  56. break;
  57. }
  58. if(lists[0].length() > 0){
  59. byte[] request = new byte[lists[0].length()];
  60. // 每次从总的消息中读取n个字节的数据
  61. message.readBytes(request);
  62. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  63. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  64. }else{
  65. if(lists.length > 1){
  66. String ss = lists[1];
  67. if(ss.length() <= 4){
  68. qcsize += ss.length();
  69. }else{
  70. String msgType = ProtocolParam.getMsgType(split+ss);
  71. if(!MesMsgUtils.isMsgTypeOk(msgType)){
  72. qcsize += split.length() + ss.length();
  73. }else{
  74. Integer ppsize = checkResultSize(ss,msgType,split);
  75. if(ppsize > 0){
  76. qcsize = ppsize;
  77. retType = "OK:";
  78. }else{
  79. qcsize += split.length() + ss.length();
  80. }
  81. }
  82. }
  83. }
  84. }
  85. if(qcsize > 0){
  86. byte[] request = new byte[qcsize];
  87. // 每次从总的消息中读取n个字节的数据
  88. message.readBytes(request);
  89. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  90. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  91. }
  92. }else{
  93. break;
  94. }
  95. }
  96. // 多余的报文存起来
  97. // 第一个报文: i+ 暂存
  98. // 第二个报文: 1 与第一次
  99. size = message.readableBytes();
  100. if (size != 0) {
  101. log.info("多余的数据长度:" + size);
  102. // 剩下来的数据放到tempMsg暂存
  103. tempMsg.clear();
  104. tempMsg.writeBytes(message.readBytes(size));
  105. }
  106. }
  107. private Integer checkResultSize(String str, String msg_type, String split){
  108. String oldstr = str;
  109. str = split+str;
  110. int tpsize = 0;
  111. switch(msg_type) {
  112. case "SYNR": // 同步
  113. if(str.length() >= MesMsgUtils.SYNR_LEN){
  114. tpsize = MesMsgUtils.SYNR_LEN;
  115. }
  116. break;
  117. case "AXTW": // 心跳
  118. if(str.length() >= MesMsgUtils.AXTW_LEN){
  119. tpsize = MesMsgUtils.AXTW_LEN;
  120. }
  121. break;
  122. case "ACLW": // 重连
  123. if(str.length() >= MesMsgUtils.ACLW_LEN){
  124. tpsize = MesMsgUtils.ACLW_LEN;
  125. }
  126. break;
  127. case "MCJW":
  128. case "AQDW":
  129. case "MBDW":
  130. case "MJBW":
  131. case "MQDW":
  132. case "MKSW":
  133. case "MSBW":
  134. case "MCSW":
  135. case "AQRW":
  136. default: // 默认新报文
  137. if(str.length() >= ProtocolParam.fixedLength){ // 大于固定长度
  138. tpsize = ProtocolParam.fixedLength;
  139. }
  140. break;
  141. }
  142. return tpsize;
  143. }
  144. // 16字符串转Ascii
  145. private String hexStringToAscii(String hexString) {
  146. StringBuilder sbuilder = new StringBuilder();
  147. for (int i = 0; i < hexString.length(); i += 2) {
  148. String hexByte = hexString.substring(i, i + 2);
  149. int byteValue = Integer.parseInt(hexByte, 16);
  150. char c = (char) byteValue;
  151. sbuilder.append(c);
  152. }
  153. return sbuilder.toString();
  154. }
  155. }