XDecoder.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package com.mes.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.ByteBufUtil;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.handler.codec.ByteToMessageDecoder;
  7. import java.util.List;
  8. public class XDecoder extends ByteToMessageDecoder {
  9. static final int PACKET_SIZE = 46; // 最短包长度
  10. static final int PACKET_MAX_SIZE = 1000; // 最长包长度
  11. // 用来临时保留没有处理过的请求报文
  12. ByteBuf tempMsg = Unpooled.buffer();
  13. /**
  14. * @param ctx
  15. * @param in 请求的数据
  16. * @param out 将粘在一起的报文拆分后的结果保留起来
  17. * @throws Exception
  18. */
  19. @Override
  20. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  21. System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
  22. String content = hexStringToAscii(ByteBufUtil.hexDump(in));
  23. System.out.println("接收包内容:" + hexStringToAscii(ByteBufUtil.hexDump(in)));
  24. // 合并报文
  25. ByteBuf message = null;
  26. int tmpMsgSize = tempMsg.readableBytes();
  27. // 如果暂存有上一次余下的请求报文,则合并
  28. if (tmpMsgSize > 0) {
  29. message = Unpooled.buffer();
  30. message.writeBytes(tempMsg);
  31. message.writeBytes(in);
  32. System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
  33. System.out.println("合并后包内容:" + hexStringToAscii(ByteBufUtil.hexDump(message)));
  34. } else {
  35. message = in;
  36. }
  37. // System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + message.readableBytes());
  38. int size = 0;
  39. String split = "bbbbfffffARW";
  40. while (true){
  41. size = message.readableBytes();
  42. if(size >= PACKET_SIZE){
  43. String retType = "NG:";
  44. int qcsize = 0;
  45. String str = hexStringToAscii(ByteBufUtil.hexDump(message));
  46. String[] lists = str.split(split);
  47. if(lists.length == 1 && size > PACKET_MAX_SIZE){ // 大于最大包长度
  48. byte[] request = new byte[size - PACKET_MAX_SIZE];
  49. // 每次从总的消息中读取n个字节的数据
  50. message.readBytes(request);
  51. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  52. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  53. break;
  54. }
  55. if(lists[0].length() > 0){
  56. byte[] request = new byte[lists[0].length()];
  57. // 每次从总的消息中读取n个字节的数据
  58. message.readBytes(request);
  59. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  60. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  61. }else{
  62. if(lists.length > 1){
  63. String ss = lists[1];
  64. if(ss.length() <= 4){
  65. qcsize += ss.length();
  66. }else{
  67. String msgType = ProtocolParam.getMsgType(split+ss);
  68. if(!MesMsgUtils.isMsgTypeOk(msgType)){
  69. qcsize += split.length() + ss.length();
  70. }else{
  71. Integer ppsize = checkResultSize(ss,msgType,split);
  72. if(ppsize > 0){
  73. qcsize = ppsize;
  74. retType = "OK:";
  75. }else{
  76. qcsize += split.length() + ss.length();
  77. }
  78. }
  79. }
  80. }
  81. }
  82. if(qcsize > 0){
  83. byte[] request = new byte[qcsize];
  84. // 每次从总的消息中读取n个字节的数据
  85. message.readBytes(request);
  86. // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
  87. out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
  88. }
  89. }else{
  90. break;
  91. }
  92. }
  93. // 多余的报文存起来
  94. // 第一个报文: i+ 暂存
  95. // 第二个报文: 1 与第一次
  96. size = message.readableBytes();
  97. if (size != 0) {
  98. System.out.println("多余的数据长度:" + size);
  99. // 剩下来的数据放到tempMsg暂存
  100. tempMsg.clear();
  101. tempMsg.writeBytes(message.readBytes(size));
  102. }
  103. }
  104. private Integer checkResultSize(String str, String msg_type, String split){
  105. String oldstr = str;
  106. str = split+str;
  107. int tpsize = 0;
  108. switch(msg_type) {
  109. case "SYNR": // 同步
  110. if(str.length() >= MesMsgUtils.SYNR_LEN){
  111. tpsize = MesMsgUtils.SYNR_LEN;
  112. }
  113. break;
  114. case "AXTW": // 心跳
  115. if(str.length() >= MesMsgUtils.AXTW_LEN){
  116. tpsize = MesMsgUtils.AXTW_LEN;
  117. }
  118. break;
  119. case "ACLW": // 重连
  120. if(str.length() >= MesMsgUtils.ACLW_LEN){
  121. tpsize = MesMsgUtils.ACLW_LEN;
  122. }
  123. break;
  124. case "MCJW":
  125. case "AQDW":
  126. case "MBDW":
  127. case "MJBW":
  128. case "MQDW":
  129. case "MKSW":
  130. case "MSBW":
  131. case "MCSW":
  132. case "AQRW":
  133. default: // 默认新报文
  134. if(str.length() >= ProtocolParam.fixedLength){ // 大于固定长度
  135. tpsize = ProtocolParam.fixedLength;
  136. }
  137. break;
  138. }
  139. return tpsize;
  140. }
  141. // 16字符串转Ascii
  142. private String hexStringToAscii(String hexString) {
  143. StringBuilder sbuilder = new StringBuilder();
  144. for (int i = 0; i < hexString.length(); i += 2) {
  145. String hexByte = hexString.substring(i, i + 2);
  146. int byteValue = Integer.parseInt(hexByte, 16);
  147. char c = (char) byteValue;
  148. sbuilder.append(c);
  149. }
  150. return sbuilder.toString();
  151. }
  152. }