UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP - IOCP下的分包及重组) Packet.h
#include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG #include <vector> using namespace std; //先定义包头 typedef struct _HeadExt { //每帧所有分片公用信息 char flag[5]; //可以任意字符或数字或其他方法,用来标志我们发送的数据 UINT m_nSeqNumber; //本帧序数 USHORT m_nTotalFragment; //本帧数据可分成的总片数 USHORT m_nTotalSize; //本帧数据总长度 //每片各自信息 USHORT m_nFragmentIndex; //每帧分片序数,0 1 2 ... ,/ USHORT m_usPayloadSize; //本片数据长度 USHORT m_usFragOffset; //本片数据相对总数据的偏移量 USHORT m_bLastFragment; //是否最后一帧 //上 述数据有些重复,可以精简,有时间再修改 }HeadExt; //从socket接收到的数据 class PacketIn { public : PacketIn(char * lpszBuffer=NULL, UINT usBufferSize=0, UINT nDataSize=0); virtual ~PacketIn(); HeadExt head; BYTE * m_lpszBuffer; ULONG ip; UINT port; CVLInetAddr addr; BOOL Normalize(); UINT m_nDataLen; UINT m_usBufferSize; }; //接收到数据后开始重组使用的一个中间缓冲区,多个PacketIn合成一个Packet //发送时从一个Packet分割出多片,不过本程序里直接发送,没有封成Packet class Packet { public : enum { TIMEOUT_LOCKFRAGMENTS = 1000 }; Packet( ); ~Packet(); void reset(); void Set( const CVLInetAddr& iaFrom, UINT nSeqNumber ); BOOL InsertFragment(PacketIn* const pFragment); bool m_bUsed; int recvedpacks; int recvedbytes; int seqnum; BYTE * m_pBuffer; //测试用程序,直接访问了类的变量,没有封装成函数。上述变量正常应该写成 SetXXX ,GetXXX private : BOOL IsFragComplete() const ; ULONG m_ip; USHORT m_port; UINT m_nSeqNumber; vector<int > SeqNumberList; };Packet.cpp
#include "Packet.h" PacketIn::PacketIn(char * lpszBuffer /*=NULL*/ , UINT usBufferSize /*=0*/ , UINT nDataSize /*=0*/ ) { m_lpszBuffer = (BYTE *)lpszBuffer; m_usBufferSize = usBufferSize; //数据最大长度,其实没什么用,已经设置了这个值最大为64000 m_nDataLen = nDataSize; } PacketIn::~PacketIn() { } BOOL PacketIn::Normalize() //判断收到的数据是否有效 { const USHORT usHeaderSize = sizeof (HeadExt); if ( !m_lpszBuffer || m_usBufferSize < usHeaderSize ) //没什么用 { return FALSE; } HeadExt* pHeader = (HeadExt*)( m_lpszBuffer ); if ( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize ) { return FALSE; } head = *pHeader; if ( pHeader->m_usPayloadSize > 0 ) { memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize ); } return TRUE; } / //这里是重组数据的临时缓冲,只看这里必然迷惑,先看socket收数据那里,再回来查看 void Packet::Set( const CVLInetAddr& iaFrom, UINT nSeqNumber ) { m_iaFrom = iaFrom; m_nSeqNumber = nSeqNumber; if (m_pBuffer) delete []m_pBuffer; } void Packet::reset() { m_bUsed = false ; recvedpacks = 0; seqnum = 0; recvedbytes = 0; m_pBuffer = NULL; m_pBuffer = new BYTE [64000]; SeqNumberList.clear(); } Packet::Packet() { //calculate the ttl //m_nTTL = RUDP_REASSEMBLE_TIMEOUT*CRudpPeer::PR_SLOWHZ; reset(); } Packet::~Packet() { if (m_pBuffer) delete []m_pBuffer; } BOOL Packet::InsertFragment(PacketIn* const pFragment) { int nSize = SeqNumberList.size(); for ( int i = 0; i< nSize ;i ++) { if (nSize ==SeqNumberList[i] ) //收到重复数据包 { return FALSE; } } SeqNumberList.push_back(pFragment->head.m_nFragmentIndex); memcpy( m_pBuffer + pFragment->head.m_usFragOffset, pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize); recvedbytes += pFragment->head.m_usPayloadSize; recvedpacks++; m_bUsed = true ; CString str; str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d/r/n", pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment, pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset, pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize); OutputDebugString(str); if (recvedbytes == pFragment->head.m_nTotalSize ) return TRUE; return FALSE; }发送时的操作:
#iclude "packet.h" const int RTP_SPLIT_PACKSIZE = 1300; //udp一般最大包为1500,一般这里都设置为1400,当然1300也没错 LONGLONG index = rand(); //帧序数,从一个随机数开始,我机器上生成的是41 //分包发送,逻辑相当简单,按最常规的方法分割发送 void Send( BYTE * pBuff, UINT nLen, ULONG ip, USHORT port) { HeadExt head; strcpy(head.flag ,"aaaa" ); head.m_nSeqNumber = index++; //帧序数 head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE; head.m_nTotalFragment += nLen%RTP_SPLIT_PACKSIZE?1:0; //整除的话就是0,否则多出一个尾数据 head.m_nFragmentIndex = 0;//第一个分段从0开始 head.m_nTotalSize = nLen; head.m_bLastFragment = 0; head.m_usFragOffset = 0; char tem[RTP_SPLIT_PACKSIZE+ sizeof (HeadExt)]; if (head.m_nTotalFragment == 1) { memcpy(tem,&head,sizeof (HeadExt)); memcpy(tem+sizeof (HeadExt),pBuff,nLen); head.m_usPayloadSize = nLen; //用UDP发送,常规做法,先对UDP做一个封装的类,这里调用类对象。本代码只说明原理,类似伪代友 sendto(tem,nLen+sizeof (HeadExt),ip,port); CString str; str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" , head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex, head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); OutputDebugString(str); return ; } int i = 0; for ( i = 0; i < head.m_nTotalFragment-1; i++) //开始分割,最后一个单独处理 { head.m_bLastFragment = 0; head.m_nFragmentIndex = i; head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; head.m_usPayloadSize = RTP_SPLIT_PACKSIZE; memcpy(tem,&head,sizeof (HeadExt)); memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE); sendto(tem,nLen+sizeof (HeadExt),ip,port); CString str; str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" , head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex, head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); OutputDebugString(str); } head.m_bLastFragment = 1; head.m_nFragmentIndex = i; head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE); memcpy(tem,&head,sizeof (HeadExt)); memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE)); sendto(tem,nLen+sizeof (HeadExt),ip,port); CString str; str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" ,head.m_nSeqNumber, head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset, head.m_nTotalSize,head.m_usPayloadSize); OutputDebugString(str); }接收数据的回调 //代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去 //因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。
void RecvCallback( BYTE * pBuff, UINT nLen, ULONG ip, USHORT port) { if (nLen < sizeof (HeadExt)) return ; HeadExt* pHead = (HeadExt*)pBuff; CString str = pHead->flag; if (str != "aaaa" ) { return ; } if (pHead->m_nTotalFragment == 1) //只有一帧,不分片 { //回调,上层处理 if (m_pfDispatch) m_pfDispatch(pBuff+sizeof (HeadExt),nLen- sizeof (HeadExt),ip,port,m_lpParam); return ; } PacketIn data( (char *)pBuff, 64000, nLen ); if ( !data.Normalize() ) return ; if ( data.head.m_nTotalFragment>1 ) { Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber ); if ( pTemp ) { if ( pTemp ->InsertFragment( &data ) ) { m_pfDispatch(pTemp ->m_pBuffer, pTemp ->recvedbytes,ip,port,m_lpParam); } }//end of if } }//上面用到的一些变量可以在一个.h里面定义,比如: Packet TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲 Packet* GetPartialPacket(const CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点
//根据这几个关键字查找,不过只用 //到了nSeqNumber,要是3人以上的视频聊天,ip和port是必要的 Packet* GetPartialPacket(const ULONG ip, USHORT port, UINT nSeqNumber) { Packet* tmp = NULL; int i=0; while (tmp==NULL && i<16) { //该包所属的帧已有其它数据包到达 if (TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0) { tmp = &TempPacket[i]; break ; } i++; } if (tmp == NULL) //新的帧 { //查找空闲元素 for (i=0;i<16;i++) { if (!TempPacket[i].m_bUsed) break ; } if (i>=16) { //没有空闲的元素,丢掉一个最早 tmp = &TempPacket[0]; int j = 0; for (i=1;i<16;i++) { if (TempPacket[i].seqnum < tmp->seqnum) { tmp = &TempPacket[i]; j = i; } } //找到最早的一帧 if (tmp->m_pBuffer) { delete []tmp->m_pBuffer; tmp->reset(); } } else tmp = &TempPacket[i]; } InsertFragment tmp->m_bUsed = true ; tmp->seqnum = nSeqNumber; return tmp; }整个示例最重要的是两个函数: GetPartialPacket:取到一个临时缓冲节点 InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧 当然发送方也要按一定规则分割发送。