熟悉完成端口的请进。 ( 积分: 100 )

  • 主题发起人 主题发起人 9278707
  • 开始时间 开始时间
9

9278707

Unregistered / Unconfirmed
GUEST, unregistred user!
我的应用服务器用的是完成端口模型,现在遇到几个问题:
1、当客户端连接上之后又断开,重新连接,发生错误提示:Socket Error #10061 Connection refused.
2、如果客户端以外断开,应用服务器如何获得客户端断开信息?

望有过完成端口开发经验的高手赐教!
 
我的应用服务器用的是完成端口模型,现在遇到几个问题:
1、当客户端连接上之后又断开,重新连接,发生错误提示:Socket Error #10061 Connection refused.
2、如果客户端以外断开,应用服务器如何获得客户端断开信息?

望有过完成端口开发经验的高手赐教!
 
怎么没人回答?
没办法点名了Another_eYes,barton,张无忌三位高人请进!
 
问题1:你的接受连接请求处理有问题
  2:这个有多种方案,心跳包是一种方案。另一个设超时也可以
 
第二个问题我决定用设置超时,刚才试了一下,5000000条记录检索一遍几乎不费时间。
代码如下:
unit CompletePort;


interface

uses
Windows, WinSock2, Dialogs;

Const
DATA_BUFSIZE = 10;
RECV_POSTED = 0;
SEND_POSTED = 1;

Type
LPPER_HANDLE_DATA = ^PER_HANDLE_DATA;

PER_HANDLE_DATA = record //用于存放套接字句柄
Socket: TSocket;
end;


LPPER_IO_OPERATION_DATA = ^PER_IO_OPERATION_DATA;

PER_IO_OPERATION_DATA = record //用于存放跟套接字句柄有关的信息
Overlapped: OVERLAPPED;
DataBuf: WSABUF;
Buffer: array[0..DATA_BUFSIZE] of CHAR;
OperationType: byte;
// other useful information
end;

function StartServer: Integer;//开启服务

function ListenThread(params: pointer): DWORD;//监听线程

function SerVerWorkerThread(params: pointer): DWORD;//工作线程

var
hIocp: THANDLE;//完成端口句柄
sizeofPHD,sizeofPID: Integer;
SystemInfo: SYSTEM_INFO;//作用:获得Cpu个数
Listen: TSOCKET;//监听套接字
InternetAddr: sockaddr_in;
IsExit: boolean;
IsRunning: boolean;

implementation

function StartServer:Integer;
var
v_WSAData: TWSADATA;
I: Cardinal;
ThreadID,ThreadHandle: Cardinal;
begin
//Step1初始化WinSock2
if WSAStartup(MAKEWORD(2, 2), v_WSAData) <> 0 then
begin
StartServer := 1;//不能初始化Winsock2
Exit;
end;

sizeofPHD := sizeof(PER_HANDLE_DATA);
sizeofPID := sizeof(PER_IO_OPERATION_DATA);

//Step2创建完成端口(先判断系统处理器个数)
GetSystemInfo(SystemInfo);
hIocp := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, SystemInfo.dwNumberOfProcessors);
//这里最后一个参数是每次有效的最大工作线程的数量,设为0时表示等于处理器个数
if hIocp = 0 then
begin
StartServer := 2;//不能创建完全端口
Exit;
end;

//Step3创建工作线程,根据2中得到的处理器信息,在完成端口上为已完成的I/O请求提供服务
for I := 0 to SystemInfo.dwNumberOfProcessors - 1 do
begin
ThreadHandle := CreateThread(nil,0, @SerVerWorkerThread, Pointer(hIocp), 0, ThreadID);
CloseHandle(ThreadHandle);
end;

//Step4准备好一个监听套接字,监听进入的连接请求
Listen := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
InternetAddr.sin_family := AF_INET;
InternetAddr.sin_addr.s_addr := htonl(INADDR_ANY);
InternetAddr.sin_port := htons(5150);
bind(Listen, @InternetAddr, sizeof(InternetAddr));
//Step5启动侦听线程
ThreadHandle := CreateThread(nil, 0, @ListenThread, nil, 0, ThreadID);
if ThreadHandle = 0 then
begin
StartServer := 5; //侦听线程启动失败
Exit;
end;
CloseHandle(ThreadHandle);
//ListenThread(nil);
IsRunning := True;
StartServer := 0;//开始工作了
end;

//////////监听线程/////////////////////////
function ListenThread(params: pointer): DWORD;
var
PerHandleData: LPPER_HANDLE_DATA; //用于存放套接字句柄
PerIoData: LPPER_IO_OPERATION_DATA; //用于存放跟套接字句柄有关的信息
RecvBytes: Cardinal;
Flags: Cardinal;
Accept: TSOCKET;
begin
Winsock2.listen(Listen, 5);
IsExit := FALSE;
while(TRUE) do
begin
if IsExit = TRUE then
begin
break;
end;
//step5 使用accept函数 接受进入的连接请求
Accept := WSAAccept(Listen, nil, nil, nil, 0);
//为每个客户连接分配一个每连接数据块
GetMem(PerHandleData, sizeofPHD);
//printf("Socket number %d connected/n",Accept);//原文的这句我不知道是做什么用的
PerHandleData.Socket := Accept;
//将套节字与完成端口联系
CreateIoCompletionPort(Accept, hIocp, DWORD(PerHandleData), 0);
//准备接收数据
Flags := 0;
GetMem(PerIoData, sizeofPID);
PerIoData.DataBuf.len := DATA_BUFSIZE;
PerIoData.DataBuf.buf := PerIoData.Buffer;
PerIoData.OperationType := RECV_POSTED;
WSARecv(Accept, @PerIoData.DataBuf, 1, RecvBytes,
Flags, @_OVERLAPPED(PerIoData.Overlapped), nil);
//这句命名执行了,但在工作线程中就是收不到??????????????
end;
//Step6结束
IsRunning := False;
WSACleanup();
end;

///////////工作线程///////////////////////
function SerVerWorkerThread(params:pointer):DWORD;
var
PerHandleData: LPPER_HANDLE_DATA;
PerIoData: LPPER_IO_OPERATION_DATA;
BytesTransferred: DWORD; //存放完成一次I/O操作后,接受实际传入的字节数
RecvBytes, Flags: Cardinal;
ret: LongBool;

Send: TSocket;
begin
while (True) do
begin
ret := GetQueuedCompletionStatus(hIocp, BytesTransferred, DWORD(PerHandleData),
POVERLAPPED(PerIoData), INFINITE);
//执行到以上这一句,该线程就挂起了,不再动了,导致以下代码永不执行!!!!!!!??????
if not ret then
begin
//读或写操作以失败告终
continue;
end;
if(BytesTransferred = 0) and
((PerIoData.OperationType = RECV_POSTED) or
(PerIoData.OperationType = SEND_POSTED)) then
begin
//客户端已经断开套节字,释放本地套节字
closesocket(PerHandleData.Socket);
//执行了closesocket之后,所有操作都将完成
FreeMem(PerHandleData, sizeofPHD);
FreeMem(PerIoData, sizeofPID);
//GlobalFree(PerHandleData);
//GlobalFree(PerIoData);
IsExit := TRUE;//这是我做实验用的,真实的程序肯定不是在这里终结
continue;
end;
if PerIoData.OperationType = RECV_POSTED then
begin
//读取,这也是实验用的,真实的程序还有很多处理,包括数据库操作
showmessage(PerIoData.DataBuf.buf);
//fputs(PerIoData->DataBuf.buf,fp);
WSASend()
WSASend(PerHandleData.Socket,@PerIoData.DataBuf,)

end;
Flags := 0;
ZeroMemory(@PerIoData.Overlapped, sizeof(OVERLAPPED));
PerIoData.DataBuf.len := DATA_BUFSIZE;
PerIoData.DataBuf.buf := PerIoData.Buffer;
PerIoData.OperationType := RECV_POSTED;
WSARecv(PerHandleData.Socket, @PerIoData.DataBuf, 1, RecvBytes,
Flags, @_OVERLAPPED(PerIoData.Overlapped), nil);
end;
end;

end.
 
呵呵,一切皆有可能
 
吐血!leaber我想扁你!
 
socket C/S结构有办法;
完成端口不太明白
 
leaber:不要生气,开个玩笑,欢迎大家继续讨论!
 
//step5 使用accept函数 接受进入的连接请求
Accept := WSAAccept(Listen, nil, nil, nil, 0);
应该是接收这里出问题! WSAAccept 这个我也不怎么会用它!
我使用这个接收涵数不定时出现访问地址错误! 郁闷. 我
更换为accept 就没有问题! 但就觉得不爽> >
 
查了查才发现我的那个Winsocket2 有问题! 不知道楼主能否发一份
winsocket2 给我! 谢谢. 邮箱: xhzlove2000@yahoo.com.cn
 
对于断开的socket要close它。另外,在处理线程中如果你通过GetQueuedCompletionStatus得到的bytes=0,那么也认为该socket是断开了,也要close他地。呵呵。
 
unit uIOCompletPort;

interface
uses
WinSock2, Classes, windows, inifiles, Forms, SysUtils;
const
MAX_BUFSIZE = 4096; {最大缓冲}
MAX_QUEUESIZE = 1000; {最大监听队列}
RECV_POSTED = 0; {接受}
SEND_POSTED = 1; {发送}
SHUTDOWN_FLAG = $FFFFFFFF; {线程退出标志}
type
{服务端口}
TSocketPort = Record
GpsPort:Integer;
BusPort:Integer;
End;
{单句柄数据 和 单IO数据}
PPerHandledata = ^TPerHandledata;
TPerHandledata = record
Overlapped: OVERLAPPED;
ClientSocket: TSocket; {接入客户端Socket}
WSABuffer: WSABUF;
DataBuf: array[0..MAX_BUFSIZE - 1] of char;
OperType: Integer; {IO操作类型}
SmsCardNo: String; {车载序列号}
AccTiem: Cardinal; {开始接收时间}
IsUse: Boolean; {是否使用}
end;
TReadSocketEvent = procedure(Data: Pointer; Count: Integer; Socket: TSocket) of object;
TIOCPPORT = class(TObject)
private
procedure GetSocketPort;
public
constructor Create;
destructor Destroy; override;
function StartServer: Integer;
end;
var
g_StopThread: Boolean; {停止接收线程}
g_SocketPort: TSocketPort; {端口记录}
g_Lock: TRTLCriticalSection;
g_phioList: TList; {单句柄和单IO队列}
g_hIocp: THandle; {完成端口句柄}
g_Listen: TSocket; {监听Socket}
g_internetAddr: TSockAddr;
ReadSocketEvent: TReadSocketEvent;
procedure Lock;
procedure UnLock;
function AlloPerHandledata: PPerHandledata;
procedure AcceptThread(key: Pointer); stdcall; {接收线程}
procedure ServerWorkThread(key: Pointer); stdcall;{工作线程}
implementation

procedure Lock;
begin
EnterCriticalSection(g_Lock);
end;
procedure UnLock;
begin
LeaveCriticalSection(g_Lock);
end;

function AlloPerHandledata: PPerHandledata;
var
I: Integer;
PerHandle: PPerHandledata;
begin
PerHandle := nil;
for I := g_phioList.Count - 1 downto 0 do
begin
PerHandle := g_phioList;
if not PerHandle.IsUse then Break;
end;
if not Assigned(PerHandle) or (PerHandle.IsUse) then
begin
New(PerHandle);
ZeroMemory(PerHandle, sizeof(TPerHandledata));
PerHandle.ClientSocket := INVALID_SOCKET;
g_phioList.Add(PerHandle);
end;
Result := PerHandle;
end;

{接收线程}
procedure AcceptThread(key: Pointer); stdcall;
var
PerHandle: PPerHandledata; {单句柄数据}
ClientAddr: TSockAddr;
ClientAddrSize: Integer;
RecvBytes: Cardinal;
Flags: Cardinal;
AcceptSocket: TSOCKET;
ErrCode: Integer;
begin
while true do
begin
if g_StopThread then Break; {通知结束}
ClientAddrSize := sizeof(TSockAddr);
AcceptSocket := WinSock2.WSAAccept(g_Listen, @ClientAddr, @ClientAddrSize, nil, 0);
if AcceptSocket = INVALID_SOCKET then Continue;
PerHandle := AlloPerHandledata;
PerHandle.ClientSocket := AcceptSocket;
PerHandle.AccTiem := GetTickCount;
{将套接字跟完成端口关联}
if CreateIoCompletionPort(AcceptSocket, g_hIocp, DWORD(PerHandle), 0) = 0 then
begin
if Assigned(PerHandle) then
begin
PerHandle.ClientSocket := INVALID_SOCKET;
PerHandle.IsUse := False;
end;
Continue;
end;
{投递第一次接收}
Flags := 0;
ZeroMemory(@PerHandle^.Overlapped, sizeof(OVERLAPPED));
ZeroMemory(@PerHandle^.DataBuf, sizeof(PerHandle^.DataBuf));
PerHandle^.WSABuffer.len := MAX_BUFSIZE;
PerHandle^.WSABuffer.buf := PerHandle^.DataBuf;
PerHandle^.OperType := RECV_POSTED;
PerHandle.IsUse := True;
if WSARecv(PerHandle^.ClientSocket, @PerHandle^.WSABuffer, 1, RecvBytes,
Flags, @PerHandle^.Overlapped, nil) = SOCKET_ERROR then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
closesocket(PerHandle^.ClientSocket);
PerHandle^.ClientSocket := INVALID_SOCKET;
PerHandle^.IsUse := False;
end;
end;
end; //while
end;

{工作线程}
procedure ServerWorkThread(key: Pointer); stdcall;
var
PerHandle: PPerHandledata;
PerIoData: PPerHandledata;
BytesTransferred: DWORD; {存放完成一次I/O操作后,接受实际传入的字节数}
RecvBytes, Flags: Cardinal;
ErrCode, I: Integer;
begin
while true do
begin
PerHandle := nil;
PerIoData := nil;
BytesTransferred := 0;
if not GetQueuedCompletionStatus(g_hIocp, BytesTransferred,
DWORD(PerHandle), POverLapped(PerIoData), INFINITE) then
begin
if Assigned(PerHandle) then
begin
closesocket(PerHandle^.ClientSocket);
PerHandle.ClientSocket := INVALID_SOCKET;
PerHandle.IsUse := False;
end;
Continue;
end;
if BytesTransferred = 0 then
begin
if Assigned(PerHandle) then
begin
closesocket(PerHandle^.ClientSocket);
PerHandle.ClientSocket := INVALID_SOCKET;
PerHandle.IsUse := False;
end;
Continue;
end;
if Cardinal(PerIoData) = SHUTDOWN_FLAG then Break; {通知结束}
PerHandle^.AccTiem := GetTickCount; {接收数据}
if PerIoData.OperType = RECV_POSTED then
ReadSocketEvent(@PerIoData.DataBuf, BytesTransferred, PerHandle^.ClientSocket);
{继续投递接收}
Flags := 0;
RecvBytes := 0;
ZeroMemory(@PerHandle^.Overlapped, sizeof(OVERLAPPED));
ZeroMemory(@PerHandle^.DataBuf, sizeof(PerHandle^.DataBuf));
PerHandle^.WSABuffer.len := MAX_BUFSIZE;
PerHandle^.WSABuffer.buf := PerHandle^.DataBuf;
PerHandle^.OperType := RECV_POSTED;
PerHandle^.IsUse := True;
if WSARecv(PerHandle^.ClientSocket, @PerHandle^.WSABuffer, 1, RecvBytes, Flags,
@PerHandle^.Overlapped, nil) = SOCKET_ERROR then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
closesocket(PerHandle^.ClientSocket);
PerHandle^.ClientSocket := INVALID_SOCKET;
PerHandle^.IsUse := False;
end;
end;
end; //while
end;

{ TIOCPPORT }

constructor TIOCPPORT.Create;
begin
inherited Create;
g_phioList := TList.Create;
g_StopThread := False;
end;


destructor TIOCPPORT.Destroy;
var
I: Integer;
PerHandle: PPerHandledata;
begin
closesocket(g_Listen);
for I := g_phioList.Count - 1 downto 0 do
begin
PerHandle := g_phioList;
if PerHandle^.ClientSocket <> INVALID_SOCKET then
begin
closesocket(PerHandle^.ClientSocket);
PerHandle^.ClientSocket := INVALID_SOCKET;
PerHandle^.IsUse := False;
end;
Dispose(PerHandle);
end;
g_phioList.Clear;
g_phioList.Free;
PostQueuedCompletionStatus(g_hIocp, 0, 0, Pointer(SHUTDOWN_FLAG));
g_StopThread := True;
Sleep(20);
CloseHandle(g_hIocp);
WSACleanup;
inherited;
end;

procedure TIOCPPORT.GetSocketPort;
var
InifPath: String;
inifile: TIniFile;
begin
InifPath := ExtractFilePath(Application.ExeName);
inifile := TIniFile.Create(InifPath + 'SysConfig.ini');
try
g_SocketPort.GpsPort := inifile.ReadInteger('SOCKET SET', 'GpsPort', 5150);
g_SocketPort.BusPort := inifile.ReadInteger('SOCKET SET', 'BusPort', 5160);
finally
inifile.Free;
end;
end;

function TIOCPPORT.StartServer: Integer;
var
I: Integer;
WSADATA: TWSAData ;
sysInfo: SYSTEM_INFO;
Thrhandle: THandle;
ThreadID: Cardinal;
begin
{初始WinSocket2}
if WSAStartup(MakeWord(2, 2), WSADATA) <> 0 then
begin
Result := 1; {初始WinSocket2失败}
Exit;
end;
{初始完成端口句柄}
g_hIocp := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 2);
if g_hIocp = 0 then
begin
Result := 2; {初始完成端口句柄错误}
WSACleanup;
Exit;
end;
{获取系统CPU数量}
GetSystemInfo(sysInfo);
{创建工作器线程}
for I := 0 to sysInfo.dwNumberOfProcessors - 1 do
begin
Thrhandle := CreateThread(nil, 0, @ServerWorkThread, nil, 0, ThreadID);
CloseHandle(Thrhandle);
end;
{获取端口}
GetSocketPort;
{创建监听套接字}
g_Listen := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
g_internetAddr.sin_family := AF_INET;
g_internetAddr.sin_addr.S_addr := htonl(INADDR_ANY);
g_internetAddr.sin_port := htons(g_SocketPort.BusPort);
bind(g_Listen, @g_internetAddr, sizeof(g_internetAddr));
{让套接字为监听做好准备}
listen(g_Listen, MAX_QUEUESIZE);
{创建接收线程}
Thrhandle := CreateThread(nil, 0, @AcceptThread, nil, 0, ThreadID);
CloseHandle(Thrhandle);
end;

initialization
InitializeCriticalSection(g_Lock);
finalization
DeleteCriticalSection(g_Lock);
end.
// 谢谢楼主给我发WinSocket2, 这是我近来学习写的
//那出来相互学习吧
 
kk2000,能否把你的例子给我发一个/?
服务器端采用完成端口模式,客户端采用那种模式比较合适呀
 
我也是刚学的! 我的例子就这个了! 客户断采取什么模式好.我也不好说.
应该大多数是使用非阻塞的吧!
 
你的例子基本上跟书上的差不多,我的例子马上就完成了,客户端采用WSAAsyncSelect模式
完成了再放上来
 
当然啦! 因为我是刚学习的. 基本上是跟书上的一样了!
期待你的例子.
 
多人接受答案了。
 
后退
顶部