不管理你用什么组件都是要处理数据包,如不加以控制,很难处理超大的数据量。
TCP数据包问题 :
1.可以保证数据包的先后顺序(因为是面向连接的),但有可能拆包或粘包,
数据包边界没有控制,可以保证数据质量(长度)是正确的。
2.设计TCP数据包最好要定义本次包的长度,以便读取相应长度的数据段。
3.因为不可能保证每包的边界,所以不能按照每接收一次为一个包。
4.将接收的包缓存,再在缓存中分析包,计算CRC, 并取出来相应长度处理。
设计TCP数据包: 以下可以理解为一个结构,当然其中还要加其规则可以更改。
数据长度: 2字节(Word); TCP因为最大传输为65535
数据校验码: 2字节(WORD); CRC校验,可以不用校验,因为不会丢包。
因为要加密,所以还是要的好。
数据流的类型: 1字节(Byte); MSG_DATA or MSG_ENCODE or MSG_TEXT 标识加密
数据段: N字节,根据实际情况来定。
前5字节为包头,后面的字节为真实数据;
在读取时,根据包前面的长度,读取数据段。这样顺序下来就不会错误。
以下是我的一个组件里面的组包发送和接收的方法,发送超大文件都没有问题。
{ TCP数据包传输格式 4096,传输一次
数据长度: 4字节(Int);
数据校验码: 2字节(WORD); CRC校验,可以不用校验,因为不会丢包。
因为要加密,所以还是要的好。
数据流的类型: 1字节(Byte); MSG_DATA or MSG_ENCODE or MSG_TEXT
数据段: N字节,根据实际情况来定。
前7字节为包头,后面的字节为真实数据;
const
//数据流类型
IP_MSG_TXT = 1; //命令文本
IP_MSG_BIN = 2; //数据流
IP_MSG_TRN = 3; //中转数据流;
type
TSocetDataType = (msgText, msgBin, Msg_Trn);
}
function TCustomSocket.TCPSendEncode(ASocket: TCustomSocket;
AData: TStream; const DataType: TSocketDataType = msgText): Integer;
const
ADataType: array [TSocketDataType]of Byte = (IP_MSG_TXT, IP_MSG_BIN, IP_MSG_TRN);
var
Buf: array[0..MAX_BUFSIZE - 1] of Byte;
P: PChar;
CRCCode: WORD;
I: Integer;
ReadNum: Integer;
Offset: Integer;
Size: Integer;
begin
Result := -1;
if not Assigned(AData) then Exit;
if not FActive or (AData.Size < 1) then Exit;
if AData.Size >= High(Integer) then begin
ASocket.DoException(ASocket, 'TCPSendEncode: Data.Size overlong error.');
Exit;
end;
AData.Position := 0;
//加密后的长度与加密前相同
try
CRCCode := EncodeStream(AData);
if (CRCCode = $FFFF) or (AData.Size = 0) then
begin
ASocket.DoException(ASocket, 'TCPSendEncode: EncodeStream error.');
Exit;
end;
FillChar(Buf, SizeOf(Buf), #0);
P := @Buf;
//数据长度
PInteger(P)^ := Integer(AData.Size + 7){包头长度7};
//数据校验码
PWord(@P[4])^ := CRCCode;
//数据流类型
P[6] := Chr(ADataType[DataType]);
AData.Position := 0;
ReadNum := AData.Read(P[7], SizeOf(Buf) - 7);
if (ReadNum = 0 ) then
begin
ASocket.DoException(ASocket, 'TCPSendEncode: Read ReadNum = 0.');
Exit;
end;
if Assigned(OnBeginWork) then OnBeginWork(ASocket, AData.Size);
try
if ASocket is TClientSocket then begin
if TClientSocket(ASocket).ClientWrite(P^, ReadNum + 7) = SOCKET_ERROR then Exit;
end
else if ASocket is TServerClientSocket then begin
if TServerClientSocket(ASocket).Write(P^, ReadNum + 7) = SOCKET_ERROR then Exit;
end
else Exit;
//循环读取数据包并发送出去,每包4096字节
while AData.Position <> AData.Size do begin
FillChar(Buf, SizeOf(Buf), #0);
ReadNum := AData.Read(Buf, SizeOf(Buf));
if ASocket is TClientSocket then begin
if TClientSocket(ASocket).ClientWrite(Buf, ReadNum) = SOCKET_ERROR then Exit;
end
else if ASocket is TServerClientSocket then begin
if TServerClientSocket(ASocket).Write(Buf, ReadNum) = SOCKET_ERROR then Exit
end
else Exit;
if Assigned(OnWorking) then OnWorking(ASocket, AData.Size);
end; // while do
Result := AData.Size + 7;
finally
if Assigned(OnEndWork) then OnEndWork(ASocket);
end;
except on E: Exception do
ASocket.DoException(ASocket, 'TCPSendEncode: ' + E.Message);
end;
end;
function TCustomSocket.TCPRecvDecode(ASocket: TCustomSocket;
ABuffer: PChar; ABufferLength: Integer): Integer;
var
Msg: string;
TotalLength: Integer;
CRCCodeB, CRCCodeA: Integer;
DataType: TSocketDataType;
AData: TStream;
AReply: TStream;
begin
TotalLength := ABufferLength;
try
SetString(Msg, ABuffer, TotalLength);
if not TCPCheckRecvFinish(ASocket.Identity, ABuffer, TotalLength, Msg) then begin
DoException(self, 'RecvDecode for TCPCheckRecvFinish = false');
Exit;
end;
Move(Msg[1], ABuffer^, TotalLength); //还原原来的封包
Msg := '';
//校验码
CRCCodeA := PWORD(@ABuffer[4])^;
//数据流类型
DataType := TSocketDataType(Ord(ABuffer[6]));
CRCCodeB := NewDecode(@ABuffer[7], TotalLength - 7);
if CRCCodeA <> CRCCodeB then
begin
DoException(self,
Format('RecvDecode for CRCCode error. CRC1 <> CRC2: [%d, %d], Len: [%d] ',
[CRCCodeA, CRCCodeB, TotalLength - 7]));
Exit;
end;
//这里改为直接Socket 读数据就行,不再查找Socket读
AData := TMemoryStream.Create;
AReply := TMemoryStream.Create;
try
TotalLength := AData.Write(ABuffer[7], TotalLength - 7);
AData.Position := 0;
if TotalLength > 0 then begin
if Assigned(OnReadReply) then OnReadReply(ASocket, AData, AReply, DataType);
if AReply.Size > 0 then
TCPSendEncode(ASocket, AReply, DataType); //这里只回复
end;
finally
if Assigned(AData) then FreeAndNil(AData);
if Assigned(AReply) then FreeAndNil(AReply);
end;
except on E: Exception do
ASocket.DoException(ASocket, '' + E.Message);
end;
end;
//TCP 检查接收包是否完整
function TCustomSocket.TCPCheckRecvFinish(AIdentity: Integer; ABuffer: PChar;
var TotalLength: Integer; var AMsg: string): Boolean;
var
bFind: Boolean;
i: Integer;
AFCUDPBag: TTCPBag;
TempTotalLen: Integer;
CurPart: Integer;
begin
bFind := False;
Result := False;
EnterCriticalSection(FSliceBagLock);
try
for I := FSliceBagList.Count - 1 downto 0 do
begin
AFCUDPBag := TTCPBag(FSliceBagList);
//如果收了一半没有发剩下的,不就死掉了, 还是要为一个包的建立超时
//不是删除掉,而是清空它
if ((GetTickCount - AFCUDPBag.LastRecvTime) > 20000) //超过20s的,认为超时
{ and (AFCUDPBag.FCurrentLength > 0) and (AFCUDPBag.FTotalLength > 0)} then
begin
AFCUDPBag.ClearCurrentTotalLength; //清空上一次的包
if AFCUDPBag.Identity <> AIdentity then
Continue;
end;
//找到了该包, 这里是为每个客户端建立一个缓存包
if AFCUDPBag.Identity = AIdentity then begin
bFind := True;
AFCUDPBag.LastRecvTime := GetTickCount;
AFCUDPBag.RecvABag(AMsg, TotalLength); //接收一个包
if AFCUDPBag.RecvFinish then //如是完成
begin
AMsg := AFCUDPBag.RecvText; //返回结果AMsg 给用户
TotalLength := AFCUDPBag.FTotalLength;
//不是删除掉,而是清空它
AFCUDPBag.ClearCurrentTotalLength;
Result := True;
Break;
end
else begin //还没接受完毕,等待下一次接受
Exit;
end;
end // if AFCUDPBag.Identity = AIdentity then begin
end; //end for
if not bFind then begin //没找到,则新增一条进去
//注意第一次接收可能会是完整的包,就退出,
TempTotalLen := PInteger(ABuffer)^;
if TotalLength = TempTotalLen then
begin
Result := True;
//是个完整的数据包
Exit;
end;
AFCUDPBag := TTCPBag.Create;
AFCUDPBag.LastRecvTime := GetTickCount;
AFCUDPBag.FIdentity := AIdentity;
AFCUDPBag.RecvABag(AMsg, TotalLength);
FSliceBagList.Add(AFCUDPBag);
end;
finally
LeaveCriticalSection(FSliceBagLock);
end;
end;