package com.mes.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class XDecoder extends ByteToMessageDecoder { static final int PACKET_SIZE = 46; // 最短包长度 static final int PACKET_MAX_SIZE = 1000; // 最长包长度 // 用来临时保留没有处理过的请求报文 ByteBuf tempMsg = Unpooled.buffer(); /** * @param ctx * @param in 请求的数据 * @param out 将粘在一起的报文拆分后的结果保留起来 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes()); String content = hexStringToAscii(ByteBufUtil.hexDump(in)); System.out.println("接收包内容:" + hexStringToAscii(ByteBufUtil.hexDump(in))); // 合并报文 ByteBuf message = null; int tmpMsgSize = tempMsg.readableBytes(); // 如果暂存有上一次余下的请求报文,则合并 if (tmpMsgSize > 0) { message = Unpooled.buffer(); message.writeBytes(tempMsg); message.writeBytes(in); System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes()); System.out.println("合并后包内容:" + hexStringToAscii(ByteBufUtil.hexDump(message))); } else { message = in; } // System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + message.readableBytes()); int size = 0; String split = "bbbbfffffARW"; while (true){ size = message.readableBytes(); if(size >= PACKET_SIZE){ String retType = "NG:"; int qcsize = 0; String str = hexStringToAscii(ByteBufUtil.hexDump(message)); String[] lists = str.split(split); if(lists.length == 1 && size > PACKET_MAX_SIZE){ // 大于最大包长度 byte[] request = new byte[size - PACKET_MAX_SIZE]; // 每次从总的消息中读取n个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request)))); break; } if(lists[0].length() > 0){ byte[] request = new byte[lists[0].length()]; // 每次从总的消息中读取n个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request)))); }else{ if(lists.length > 1){ String ss = lists[1]; if(ss.length() <= 4){ qcsize += ss.length(); }else{ String msgType = ProtocolParam.getMsgType(split+ss); if(!MesMsgUtils.isMsgTypeOk(msgType)){ qcsize += split.length() + ss.length(); }else{ Integer ppsize = checkResultSize(ss,msgType,split); if(ppsize > 0){ qcsize = ppsize; retType = "OK:"; }else{ qcsize += split.length() + ss.length(); } } } } } if(qcsize > 0){ byte[] request = new byte[qcsize]; // 每次从总的消息中读取n个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request)))); } }else{ break; } } // 多余的报文存起来 // 第一个报文: i+ 暂存 // 第二个报文: 1 与第一次 size = message.readableBytes(); if (size != 0) { System.out.println("多余的数据长度:" + size); // 剩下来的数据放到tempMsg暂存 tempMsg.clear(); tempMsg.writeBytes(message.readBytes(size)); } } private Integer checkResultSize(String str, String msg_type, String split){ String oldstr = str; str = split+str; int tpsize = 0; switch(msg_type) { case "SYNR": // 同步 if(str.length() >= MesMsgUtils.SYNR_LEN){ tpsize = MesMsgUtils.SYNR_LEN; } break; case "AXTW": // 心跳 if(str.length() >= MesMsgUtils.AXTW_LEN){ tpsize = MesMsgUtils.AXTW_LEN; } break; case "ACLW": // 重连 if(str.length() >= MesMsgUtils.ACLW_LEN){ tpsize = MesMsgUtils.ACLW_LEN; } break; case "MCJW": case "AQDW": case "MBDW": case "MJBW": case "MQDW": case "MKSW": case "MSBW": case "MCSW": case "AQRW": default: // 默认新报文 if(str.length() >= ProtocolParam.fixedLength){ // 大于固定长度 tpsize = ProtocolParam.fixedLength; } break; } return tpsize; } // 16字符串转Ascii private String hexStringToAscii(String hexString) { StringBuilder sbuilder = new StringBuilder(); for (int i = 0; i < hexString.length(); i += 2) { String hexByte = hexString.substring(i, i + 2); int byteValue = Integer.parseInt(hexByte, 16); char c = (char) byteValue; sbuilder.append(c); } return sbuilder.toString(); } }