UDP分包重组算法

    技术2022-05-19  22

    UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP - IOCP下的分包及重组) Packet.h

    #include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG   #include <vector>   using   namespace  std;    //先定义包头 typedef   struct  _HeadExt  {      //每帧所有分片公用信息       char    flag[5]; //可以任意字符或数字或其他方法,用来标志我们发送的数据       UINT    m_nSeqNumber; //本帧序数       USHORT  m_nTotalFragment; //本帧数据可分成的总片数       USHORT  m_nTotalSize;  //本帧数据总长度       //每片各自信息       USHORT  m_nFragmentIndex;  //每帧分片序数,0 1 2 ... ,/       USHORT  m_usPayloadSize; //本片数据长度       USHORT  m_usFragOffset; //本片数据相对总数据的偏移量       USHORT  m_bLastFragment; //是否最后一帧   //上 述数据有些重复,可以精简,有时间再修改    }HeadExt;        //从socket接收到的数据   class   PacketIn  {  public  :      PacketIn(char * lpszBuffer=NULL,  UINT  usBufferSize=0,  UINT  nDataSize=0);      virtual  ~PacketIn();      HeadExt head;      BYTE *    m_lpszBuffer;      ULONG    ip;      UINT     port;      CVLInetAddr addr;      BOOL  Normalize();      UINT    m_nDataLen;      UINT    m_usBufferSize;    };    //接收到数据后开始重组使用的一个中间缓冲区,多个PacketIn合成一个Packet   //发送时从一个Packet分割出多片,不过本程序里直接发送,没有封成Packet   class  Packet   { public :      enum  { TIMEOUT_LOCKFRAGMENTS = 1000 };     Packet( );     ~Packet();     void  reset();     void  Set( const  CVLInetAddr& iaFrom,  UINT  nSeqNumber );     BOOL  InsertFragment(PacketIn*  const  pFragment);     bool  m_bUsed;      int  recvedpacks;      int  recvedbytes;      int  seqnum;      BYTE * m_pBuffer;  //测试用程序,直接访问了类的变量,没有封装成函数。上述变量正常应该写成 SetXXX ,GetXXX      private :      BOOL  IsFragComplete()  const ;     ULONG      m_ip;     USHORT    m_port;      UINT         m_nSeqNumber;      vector<int > SeqNumberList;  }; 

    Packet.cpp

    #include "Packet.h"     PacketIn::PacketIn(char * lpszBuffer /*=NULL*/UINT  usBufferSize /*=0*/ ,   UINT  nDataSize /*=0*/ )  {        m_lpszBuffer   = (BYTE *)lpszBuffer;      m_usBufferSize = usBufferSize; //数据最大长度,其实没什么用,已经设置了这个值最大为64000       m_nDataLen = nDataSize;  }  PacketIn::~PacketIn()  {  }  BOOL  PacketIn::Normalize()   //判断收到的数据是否有效   {      const   USHORT  usHeaderSize =  sizeof (HeadExt);      if  ( !m_lpszBuffer || m_usBufferSize < usHeaderSize ) //没什么用       {          return  FALSE;      }        HeadExt* pHeader = (HeadExt*)( m_lpszBuffer  );      if  ( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize )      {          return  FALSE;      }        head = *pHeader;      if  ( pHeader->m_usPayloadSize > 0 )      {  memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize );      }           return  TRUE;  }    /   //这里是重组数据的临时缓冲,只看这里必然迷惑,先看socket收数据那里,再回来查看   void  Packet::Set(  const  CVLInetAddr& iaFrom,  UINT  nSeqNumber )  {      m_iaFrom =  iaFrom;      m_nSeqNumber = nSeqNumber;      if (m_pBuffer)          delete  []m_pBuffer;  }    void  Packet::reset()  {      m_bUsed = false ;      recvedpacks = 0;      seqnum = 0;      recvedbytes = 0;      m_pBuffer = NULL;      m_pBuffer = new   BYTE [64000];      SeqNumberList.clear();  }  Packet::Packet()  {      //calculate the ttl       //m_nTTL = RUDP_REASSEMBLE_TIMEOUT*CRudpPeer::PR_SLOWHZ;       reset();  }    Packet::~Packet()  {      if (m_pBuffer)          delete  []m_pBuffer;  }    BOOL  Packet::InsertFragment(PacketIn*  const  pFragment)  {      int  nSize = SeqNumberList.size();      for ( int  i = 0; i< nSize ;i ++)      {          if (nSize ==SeqNumberList[i] ) //收到重复数据包           {              return  FALSE;          }      }      SeqNumberList.push_back(pFragment->head.m_nFragmentIndex);        memcpy( m_pBuffer + pFragment->head.m_usFragOffset,  pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize);      recvedbytes += pFragment->head.m_usPayloadSize;      recvedpacks++;      m_bUsed = true ;        CString str;  str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,  m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d/r/n",  pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment,  pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset,  pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize);          OutputDebugString(str);        if (recvedbytes == pFragment->head.m_nTotalSize )          return  TRUE;           return  FALSE;  } 

    发送时的操作:

    #iclude "packet.h"     const   int  RTP_SPLIT_PACKSIZE = 1300; //udp一般最大包为1500,一般这里都设置为1400,当然1300也没错   LONGLONG  index = rand(); //帧序数,从一个随机数开始,我机器上生成的是41   //分包发送,逻辑相当简单,按最常规的方法分割发送   void  Send( BYTE * pBuff, UINT  nLen, ULONG  ip, USHORT  port)  {      HeadExt head;      strcpy(head.flag ,"aaaa" );             head.m_nSeqNumber = index++; //帧序数         head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE;      head.m_nTotalFragment  += nLen%RTP_SPLIT_PACKSIZE?1:0; //整除的话就是0,否则多出一个尾数据       head.m_nFragmentIndex = 0;//第一个分段从0开始       head.m_nTotalSize = nLen;      head.m_bLastFragment = 0;      head.m_usFragOffset = 0;        char  tem[RTP_SPLIT_PACKSIZE+ sizeof (HeadExt)];        if (head.m_nTotalFragment == 1)      {          memcpy(tem,&head,sizeof (HeadExt));          memcpy(tem+sizeof (HeadExt),pBuff,nLen);          head.m_usPayloadSize = nLen;    //用UDP发送,常规做法,先对UDP做一个封装的类,这里调用类对象。本代码只说明原理,类似伪代友          sendto(tem,nLen+sizeof (HeadExt),ip,port);            CString str;          str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" ,  head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex, head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);          OutputDebugString(str);            return ;      }        int  i = 0;      for ( i = 0; i < head.m_nTotalFragment-1; i++) //开始分割,最后一个单独处理       {          head.m_bLastFragment = 0;          head.m_nFragmentIndex = i;          head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;          head.m_usPayloadSize = RTP_SPLIT_PACKSIZE;          memcpy(tem,&head,sizeof (HeadExt));          memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE);         sendto(tem,nLen+sizeof (HeadExt),ip,port);            CString str;          str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" ,  head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex, head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);          OutputDebugString(str);        }        head.m_bLastFragment = 1;      head.m_nFragmentIndex = i;      head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;      head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE);        memcpy(tem,&head,sizeof (HeadExt));      memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE));        sendto(tem,nLen+sizeof (HeadExt),ip,port);        CString str;  str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" ,head.m_nSeqNumber, head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset, head.m_nTotalSize,head.m_usPayloadSize);   OutputDebugString(str);  } 

    接收数据的回调 //代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去 //因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。

    void  RecvCallback( BYTE * pBuff, UINT  nLen, ULONG  ip, USHORT  port)  {      if (nLen <  sizeof (HeadExt))          return ;     HeadExt* pHead = (HeadExt*)pBuff;      CString str = pHead->flag;      if (str !=  "aaaa" )      {         return ;      }     if (pHead->m_nTotalFragment == 1) //只有一帧,不分片       {          //回调,上层处理           if (m_pfDispatch) m_pfDispatch(pBuff+sizeof (HeadExt),nLen- sizeof (HeadExt),ip,port,m_lpParam);         return ;      }        PacketIn data( (char *)pBuff, 64000, nLen );      if  ( !data.Normalize() )          return ;     if  ( data.head.m_nTotalFragment>1 )      {         Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber );          if  ( pTemp )          {              if  ( pTemp ->InsertFragment( &data ) )              {                  m_pfDispatch(pTemp ->m_pBuffer,                      pTemp ->recvedbytes,ip,port,m_lpParam);              }          }//end of if       }  } 

    //上面用到的一些变量可以在一个.h里面定义,比如: Packet TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲 Packet* GetPartialPacket(const CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点

    //根据这几个关键字查找,不过只用   //到了nSeqNumber,要是3人以上的视频聊天,ip和port是必要的   Packet* GetPartialPacket(const   ULONG  ip, USHORT  port,  UINT  nSeqNumber)  {      Packet* tmp = NULL;      int  i=0;      while (tmp==NULL && i<16)      {          //该包所属的帧已有其它数据包到达           if (TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0)          {              tmp = &TempPacket[i];              break ;          }          i++;      }      if (tmp == NULL)         //新的帧       {          //查找空闲元素           for (i=0;i<16;i++)          {              if (!TempPacket[i].m_bUsed)                  break ;          }            if (i>=16)          {              //没有空闲的元素,丢掉一个最早               tmp = &TempPacket[0];              int  j = 0;              for (i=1;i<16;i++)              {                  if (TempPacket[i].seqnum < tmp->seqnum)                  {                      tmp = &TempPacket[i];                      j = i;                  }              }              //找到最早的一帧               if (tmp->m_pBuffer)              {                  delete  []tmp->m_pBuffer;                  tmp->reset();              }          }          else               tmp = &TempPacket[i];      }        InsertFragment      tmp->m_bUsed = true ;      tmp->seqnum = nSeqNumber;        return  tmp;  } 

    整个示例最重要的是两个函数: GetPartialPacket:取到一个临时缓冲节点 InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧 当然发送方也要按一定规则分割发送。


    最新回复(0)