| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package com.mes.netty;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.ByteBufUtil;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.List;
- public class XDecoder extends ByteToMessageDecoder {
- public static final Logger log = LoggerFactory.getLogger(XDecoder.class);
- static final int PACKET_SIZE = 46; // 最短包长度
- static final int PACKET_MAX_SIZE = 1000; // 最长包长度
- // 用来临时保留没有处理过的请求报文
- ByteBuf tempMsg = Unpooled.buffer();
- /**
- * @param ctx
- * @param in 请求的数据
- * @param out 将粘在一起的报文拆分后的结果保留起来
- * @throws Exception
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- log.info(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
- String content = hexStringToAscii(ByteBufUtil.hexDump(in));
- log.info("接收包内容:" + hexStringToAscii(ByteBufUtil.hexDump(in)));
- // 合并报文
- ByteBuf message = null;
- int tmpMsgSize = tempMsg.readableBytes();
- // 如果暂存有上一次余下的请求报文,则合并
- if (tmpMsgSize > 0) {
- message = Unpooled.buffer();
- message.writeBytes(tempMsg);
- message.writeBytes(in);
- log.info("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
- log.info("合并后包内容:" + hexStringToAscii(ByteBufUtil.hexDump(message)));
- } else {
- message = in;
- }
- // log.info(Thread.currentThread() + "收到了一次数据包,长度是:" + message.readableBytes());
- int size = 0;
- String split = "bbbbfffffARW";
- while (true){
- size = message.readableBytes();
- if(size >= PACKET_SIZE){
- String retType = "NG:";
- int qcsize = 0;
- String str = hexStringToAscii(ByteBufUtil.hexDump(message));
- String[] lists = str.split(split);
- if(lists.length == 1 && size > PACKET_MAX_SIZE){ // 大于最大包长度
- byte[] request = new byte[size - PACKET_MAX_SIZE];
- // 每次从总的消息中读取n个字节的数据
- message.readBytes(request);
- // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
- out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
- break;
- }
- if(lists[0].length() > 0){
- byte[] request = new byte[lists[0].length()];
- // 每次从总的消息中读取n个字节的数据
- message.readBytes(request);
- // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
- out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
- }else{
- if(lists.length > 1){
- String ss = lists[1];
- if(ss.length() <= 4){
- qcsize += ss.length();
- }else{
- String msgType = ProtocolParam.getMsgType(split+ss);
- if(!MesMsgUtils.isMsgTypeOk(msgType)){
- qcsize += split.length() + ss.length();
- }else{
- Integer ppsize = checkResultSize(ss,msgType,split);
- if(ppsize > 0){
- qcsize = ppsize;
- retType = "OK:";
- }else{
- qcsize += split.length() + ss.length();
- }
- }
- }
- }
- }
- if(qcsize > 0){
- byte[] request = new byte[qcsize];
- // 每次从总的消息中读取n个字节的数据
- message.readBytes(request);
- // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
- out.add(retType+hexStringToAscii(ByteBufUtil.hexDump(Unpooled.copiedBuffer(request))));
- }
- }else{
- break;
- }
- }
- // 多余的报文存起来
- // 第一个报文: i+ 暂存
- // 第二个报文: 1 与第一次
- size = message.readableBytes();
- if (size != 0) {
- log.info("多余的数据长度:" + size);
- // 剩下来的数据放到tempMsg暂存
- tempMsg.clear();
- tempMsg.writeBytes(message.readBytes(size));
- }
- }
- private Integer checkResultSize(String str, String msg_type, String split){
- String oldstr = str;
- str = split+str;
- int tpsize = 0;
- switch(msg_type) {
- case "SYNR": // 同步
- if(str.length() >= MesMsgUtils.SYNR_LEN){
- tpsize = MesMsgUtils.SYNR_LEN;
- }
- break;
- case "AXTW": // 心跳
- if(str.length() >= MesMsgUtils.AXTW_LEN){
- tpsize = MesMsgUtils.AXTW_LEN;
- }
- break;
- case "ACLW": // 重连
- if(str.length() >= MesMsgUtils.ACLW_LEN){
- tpsize = MesMsgUtils.ACLW_LEN;
- }
- break;
- case "MCJW":
- case "AQDW":
- case "MBDW":
- case "MJBW":
- case "MQDW":
- case "MKSW":
- case "MSBW":
- case "MCSW":
- case "AQRW":
- default: // 默认新报文
- if(str.length() >= ProtocolParam.fixedLength){ // 大于固定长度
- tpsize = ProtocolParam.fixedLength;
- }
- break;
- }
- return tpsize;
- }
- // 16字符串转Ascii
- private String hexStringToAscii(String hexString) {
- StringBuilder sbuilder = new StringBuilder();
- for (int i = 0; i < hexString.length(); i += 2) {
- String hexByte = hexString.substring(i, i + 2);
- int byteValue = Integer.parseInt(hexByte, 16);
- char c = (char) byteValue;
- sbuilder.append(c);
- }
- return sbuilder.toString();
- }
- }
|