我已经开了100个线程,为什么效率很低? ( 积分: 120 )

  • 主题发起人 牛头12236
  • 开始时间
线程不要太多!
 
我不知道你客户端是如何跟控制站连接的,假设TCP或者命名管道吧
那么你可以用完成端口架构,有少数线程负责读取客户端数据并把它放入队列
因为你根据脚本更新SQLServer的数据,这个处理时间比较长,如果你只有一个
与数据库的连接那就没必要有多个线程了
 
to tseug:
不好意思,你讲的完成端口我没有接触过,可否MSN上见:
LoveKilly@hotmail.com
 
还是没有明白,你的需求到底是什么?
 通迅的发起方是什么
通迅的接受方是什么
传输的数据包是什么
接受方如何处理。
 效率优化哪个部分 是 接收方的 收包部分, 接收方的处理部分,还是接收方的应答部分?

 
 
to djh_djh:
我讲的控制站实际上可以看成是一个SQL Server,但是它与SQL Server不同的地方是控制站在固定的周期内必须按照一定的算法更新控制站内部的数据。而在服务器端最多允许运行100个这样的控制站(同一进程内)。目前我用的是COM+技术来实现客户端与服务器端通信,我正在看<<Windows网络编程>>第一章,我不知道你讲的什么意思,:p
 
线程并不能提高效率。只是能实现并行而已。
因为只有一个CPU。线程实际上是轮流占用CPU的时间片。所以效率不会有提高的。
 
线程并不能提高效率。只是能实现并行而已。
因为只有一个CPU。线程实际上是轮流占用CPU的时间片。所以效率不会有提高的。
转贴 ^^
 
to wr960204, 刘麻子:
对于要运行100个控制站、每一个控制站以500MS为一个周期来更新它自己内部数据的问题,两位有何良策?
 
1.采用线程池
这并不复杂,只是对你的每个线程标记一个状态,然后根据需要创建和删除,当然,好的算法是必要的
2.请参考CreateFiber
这个函数以及一系列相关的函数对你的工作会相当有用.
完成端口好像复杂了些,我想对于你的问题
 
to zjan521:
>> 参考CreateFiber
用纤程?我记得<<Windows核心编程>>书中有介绍过纤程,它上面说最好是用线程,是不是由于效率的原因,zjan521兄能否指点一下
>>完成端口好像复杂了些
只要效率高,复杂没有关系,我可以下功夫。
 
:( 呵呵,大虾们是不是还没有起床?
 
至少有两种方法:
1、如果都是在LAN里面的话,使用UDP的组播协议,直接用WinSocket编程实现。
2、如果是在广域网的话,可以使用上面说的线程池的方式,使用TCP协议,也是直接用WinSocket编程。不过,好的线程池不是那么好写的。
这样,速度方面应快一些,再者自己对出现的BUG进行更好的把握。还有就是对现在的代码进行优化,看看还有没有改进的余地!
 
to 寻路:
路兄可否发一个这方面的例子给我看看,大家都这么热心,我搞不清楚该怎么弄了,呵呵。
 
下面是一个组播的例子,组播的原理是针对同一组内的所有IP地址只需要发送一份数据。这样就大大的降低了你去循环所有IP地址发送数据的情况,关于连接池的实现我没有写过,请自己查找这方面的资料,这个例子比较简单,仅供参考,如果使用的话可能还需要处理数据的丢失、数据包前后不一致等一些情况:
实现类:
unit UdpSocket;
interface
uses
Classes, SysUtils, WinSock, Windows;
const
DEFAULTBUFFERSIZE = 16384;
MAXBUFFERSIZE = 63488;
MULTICAST_TTL = 10;
type
PIP_mreq = ^TIP_mreq;
TIP_mreq = record
imr_multiaddr : in_addr;
imr_interface : in_addr;
end;

ESocketError = class(Exception);
TSockSytle = (MultCastSend, MultCastRecv);
TUdpRecv = procedure(var Buf;
Len: Integer;
FromIP: string;
FromPort: u_Short) of object;
TUcpRecvThd = class(TThread)
private
FSocket : TSocket;
FBufSize : Integer;
FOnUdpRecv : TUdpRecv;
protected
procedure Execute;
override;
end;

TUcpSocket = class(TObject)
private
class procedure StartSocket();
static;
class procedure StopSocket();
static;
private
FOnUdpRecv : TUdpRecv;
FLocalAddr : String;
FPort : u_Short;
FSocket : TSocket;
FAddrTo : TSockAddr;
FStyle : TSockSytle;
FBufSize : Integer;
FRemoteAddr : String;
FMCReq : TIP_mreq;
FUcpRecvThd : TUcpRecvThd;
private
procedure SetLocalAddr(Value: String);
procedure SetPort(Value: u_Short);
procedure SetSytle(Value: TSockSytle);
procedure SetBufSize(Value: Integer);
procedure SetRemoteAddr(Value: String);
public
function Send(var Buf;
Len: Integer): Boolean;
procedure Busk();
published
property LocalAddr: String read FLocalAddr write SetLocalAddr;
property Port: u_Short read FPort write SetPort;
property Style: TSockSytle write SetSytle;
property BufSize: Integer read FBufSize write SetBufSize;
property RemoteAddr: String read FRemoteAddr write SetRemoteAddr;
property OnUdpRecv: TUdpRecv read FOnUdpRecv write FOnUdpRecv;
public
constructor Create();
destructor Destroy;
override;
end;

implementation
{ TUcpSocket }
procedure TUcpSocket.Busk;
var
pPE : PProtoEnt;
Sock : TSocket;
SockAddrLocal, SockAddrRemote : TSockAddr;
nTTL, nReuseAddr : integer;
MCReq : TIP_mreq;
begin
pPE := GetProtoByName('UDP');
Sock := Socket(AF_INET, SOCK_DGRAM, pPE.p_proto);
if Sock = INVALID_SOCKET then
raise ESocketError.Create('创建Socket失败!');
nReuseAddr := 1;
if SetSockOpt(Sock, SOL_SOCKET, SO_REUSEADDR, @nReuseAddr, SizeOf(integer)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

FillChar(SockAddrLocal, SizeOf(SockAddrLocal), 0);
SockAddrLocal.sin_family := AF_INET;
if FStyle = MultCastSend then
SockAddrLocal.sin_port := htons(0)
else
SockAddrLocal.sin_port := htons(Port);
SockAddrLocal.sin_addr.S_addr := Inet_Addr(PChar(FLocalAddr));
if Bind(Sock, SockAddrLocal, SizeOf(SockAddrLocal)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

if FStyle = MultCastSend then
begin
//设置发送缓冲大小
if SetSockOpt(Sock, SOL_SOCKET, SO_SNDBUF,
@FBufSize, SizeOf(Integer)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

//设置发送时的参数
if SetSockOpt(Sock, IPPROTO_IP, IP_MULTICAST_IF, @(SockAddrLocal.sin_addr),
SizeOf(In_Addr)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;
nTTL := MULTICAST_TTL;
if SetSockOpt(Sock, IPPROTO_IP, IP_MULTICAST_TTL, @nTTL, SizeOf(integer)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

FillChar(SockAddrRemote, SizeOf(SockAddrRemote), 0);
SockAddrRemote.sin_family := AF_INET;
SockAddrRemote.sin_port := htons(Port);
SockAddrRemote.sin_addr.S_addr := Inet_Addr(PChar(FRemoteAddr));
FAddrTo := SockAddrRemote;
end else
//接收
begin
//设置接收缓冲大小
if SetSockOpt(Sock, SOL_SOCKET, SO_RCVBUF, @fBufSize, SizeOf(integer)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

//加入组
MCReq.imr_multiaddr.S_addr := Inet_Addr(PChar(FRemoteAddr));
MCReq.imr_interface.S_addr := Inet_Addr(PChar(FLocalAddr));
if SetSockOpt(Sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, @MCReq,
SizeOf(TIP_mreq)) = SOCKET_ERROR then
begin
CloseSocket(Sock);
Exit;
end;

fMCReq := MCReq;
end;

FSocket := Sock;
if FStyle = MultCastRecv then
begin
FUcpRecvThd.FSocket := FSocket;
FUcpRecvThd.FBufSize := FBufSize;
FUcpRecvThd.FOnUdpRecv := FOnUdpRecv;
FUcpRecvThd.Resume;
end;
end;

constructor TUcpSocket.Create;
begin
FOnUdpRecv := nil;
FLocalAddr := '127.0.0.1';
FPort := 0;
FStyle := MultCastRecv;
FBufSize := DEFAULTBUFFERSIZE;
FUcpRecvThd := TUcpRecvThd.Create(true);
end;

destructor TUcpSocket.Destroy;
begin
CloseSocket(FSocket);
FUcpRecvThd.Free;
inherited;
end;

function TUcpSocket.Send(var Buf;
Len: Integer): Boolean;
begin
Result := false;
if SendTo(FSocket, Buf, Len, MSG_DONTROUTE, FAddrTo,
SizeOf(FAddrTo)) <> SOCKET_ERROR then
Result := true;
end;

procedure TUcpSocket.SetLocalAddr(Value: String);
begin
FLocalAddr := Value;
end;

procedure TUcpSocket.SetBufSize(Value: Integer);
begin
FBufSize := Value;
end;

procedure TUcpSocket.SetPort(Value: u_Short);
begin
FPort := Value;
end;

procedure TUcpSocket.SetRemoteAddr(Value: String);
var
nMCAddr : Cardinal;
begin
FRemoteAddr := Value;
nMCAddr := ntohl(inet_addr(PChar(FRemoteAddr)));
if not ((nMCAddr <= $efffffff) and (nMCAddr >= $e0000100)) then
raise ESocketError.Create('无效的组播地址!');
end;

procedure TUcpSocket.SetSytle(Value: TSockSytle);
begin
FStyle := Value;
end;

class procedure TUcpSocket.StartSocket;
var
WsData: TWSAData;
err: Integer;
begin
err := WSAStartup(MAKEWORD(2, 2), WsData);
if err <> 0 then
raise ESocketError.Create('不能使用SOCKET服务!');
if ( LOBYTE( WsData.wVersion ) <> 2 ) or
( HIBYTE( WsData.wVersion ) <> 2 ) then
raise ESocketError.Create('没有找到所需要的SOCKET版本!');
end;

class procedure TUcpSocket.StopSocket;
begin
WSACleanup;
end;

{ TUcpRecvThd }
procedure TUcpRecvThd.Execute;
var
readFDs : TFDSet;
nRecved, nAddrLen: integer;
Buf : array [0..MAXBUFFERSIZE] of Byte;
SockFrom : TSockAddr;
begin
Priority := tpHighest;
while not Terminateddo
begin
nAddrLen := SizeOf(SockFrom);
FD_ZERO(readFDs);
FD_SET(FSocket, readFDs);
Select(0, @readFDs, nil, nil, nil);
if FD_ISSET(FSocket, readFDs) then
begin
nRecved := RecvFrom(FSocket, buf, FBufSize, 0, SockFrom, nAddrLen);
if Assigned(FOnUdpRecv) then
FOnUdpRecv(Buf, nRecved, string(Inet_Ntoa(SockFrom.sin_addr)),
Cardinal(Ntohs(SockFrom.sin_port)));
end;
end;
end;

initialization
TUcpSocket.StartSocket;
finalization
TUcpSocket.StopSocket;
end.

例子:
unit Demo;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, UdpSocket, WinSock;
const
MULTCASTADDR: String = '225.0.1.177';
MULTCASTPORT: Integer = 10000;
type
TUdpSocketDemo = class(TForm)
edtSendText: TEdit;
meoRecvText: TMemo;
cmdSend: TButton;
cmdInit: TButton;
cmdExit: TButton;
procedure FormClose(Sender: TObject;
var Action: TCloseAction);
procedure cmdExitClick(Sender: TObject);
procedure cmdSendClick(Sender: TObject);
procedure cmdInitClick(Sender: TObject);
private
{ Private declarations }
FMultCastUdpSend: TUcpSocket;
//Send Socket
FMultCastUdpRecv: TUcpSocket;
//Recv Socket
public
{ Public declarations }
procedure OnUdpRecv(var Buf;
Len: Integer;
FromIP: string;
FromPort: u_Short);
end;

var
UdpSocketDemo: TUdpSocketDemo;
implementation
{$R *.dfm}
procedure TUdpSocketDemo.cmdInitClick(Sender: TObject);
begin
FMultCastUdpSend := TUcpSocket.Create;
FMultCastUdpSend.
LocalAddr := '172.18.2.212';
FMultCastUdpSend.
Port := MULTCASTPORT;
FMultCastUdpSend.
Style := MultCastSend;
FMultCastUdpSend.
RemoteAddr := MULTCASTADDR;
FMultCastUdpSend.
Busk;
FMultCastUdpRecv := TUcpSocket.Create;
FMultCastUdpRecv.LocalAddr := '172.18.2.212';
FMultCastUdpRecv.Port := MULTCASTPORT;
FMultCastUdpRecv.Style := MultCastRecv;
FMultCastUdpRecv.RemoteAddr := MULTCASTADDR;
FMultCastUdpRecv.OnUdpRecv := OnUdpRecv;
FMultCastUdpRecv.Busk;

cmdInit.Enabled := false;
end;

procedure TUdpSocketDemo.cmdSendClick(Sender: TObject);
var
Buf: array of Char;
Len: Integer;
begin
Len := Length(edtSendText.Text) + 1;
SetLength(Buf, Len);
StrPCopy(@Buf[0], edtSendText.Text);
FMultCastUdpSend.
Send(Buf, Len);

end;

procedure TUdpSocketDemo.cmdExitClick(Sender: TObject);
begin
Close;
end;

procedure TUdpSocketDemo.FormClose(Sender: TObject;
var Action: TCloseAction);
begin
FMultCastUdpSend.
Free;
end;

procedure TUdpSocketDemo_OnUdpRecv(var Buf;
Len: Integer;
FromIP: string;
FromPort: u_Short);
begin
meoRecvText.Lines.Add(String(Buf));
end;

end.
 
to 寻路:
多谢路兄,我研究一下先!
 
[:D]有10000个连接要处理,用完成端口应该不算复杂, 关键是脚本处理, 你能否有那么
高效的解释器, 解释中间有没有等待IO的地方? 你可以先试一下只是发送接收的情况,
不解释脚本直接把数据放入队列, 看看效率如何. 然后, 看看如果访问数据库会对效率
有多少影响, 是不是因为访问数据库导致效率低. 最后看看如何优化那个脚本解释器,
比如有没有必要采用预编译或者即时编译的方式, 不过优化脚本解释器的代价可不小,
针对这个项目你要考虑好是不是值得.
 
to tseug:
脚本解释器主要负责执行更新控制站内部数据的算法。控制站执行这些算法大概需要大几百毫秒的时间,因为这些算法量很大,一个算法写成一个过程,那么大约有一千个左右的算法。
  原来没有加入线程同步技术,每个算法中几乎都有读取或更改内部数据的代码,再加上客户端请求时访问内部数据,所以总是会出现线程冲突问题。
脚本解释器优化估计是行不通了,因为运算量太大了,除非有什么方法可以将脚本的代码编译成动态链接库什么的,这样可能效率会提高一些。
  这个问题真的让我很头疼啊,昨晚想了一个解决方案,大概是这样:
第 n 台控制站内部数据
/ | /
控制站数据引擎(这里封装与控制站通讯的方法,
    我想采用线程池或完成端口的方法来处理客户端并发请求)
/ | /
客户端1 客户端2 客户端n
但是每隔一个周期的控制站内部数据更新操作要怎么解决啊?而且100个控制站的更新操作要并发执行啊!
 
to 寻路:
  多谢路兄,不过你发的例子我看不懂,我开始研究<<Windows网络编程>>了,我想,再过段时间就会好一点了,到时候再向你请教!
 
>但是每隔一个周期的控制站内部数据更新操作要怎么解决啊?而且100个控制站的更新操作要并发执行啊!
并发不是绝对的,你只要处理得速度够快,轮询也是可以达到这个效果的
 
to tseug:
那我要怎么实现较精确的周期模拟啊,用CreateWaitableTimer可以吗?
 
顶部