200分请教单链表/多线程/连接池的一个问题,有源码,大家帮忙看看,分析一下,在线等.....(200分)

  • 主题发起人 主题发起人 rain_2002
  • 开始时间 开始时间
R

rain_2002

Unregistered / Unconfirmed
GUEST, unregistred user!
我在一个系统中,采用了双socket,所以数据量非常的大,而数据包中的内容解析过程相对不较大,所以如果采用一个线程来处理,肯定会丢失很多数据,所以我采用的方式是在数据的录入端采用单链表,数据从链表尾部追加,然后启动多个线程从链表的次接点取数。我一共启动了36个线程来分别获取数据和独立解析,然后送界面显示和数据存储。
可是我封装了一个当链表类,和继承了一个线程类。可是在测试中发现线程在使用一段时间后会死掉。这是怎么回事?
另一方面,由于采用多线程操作,所以每个线程单独占有一的数据库连接,然后都往一个数据表中插入数据,那么会不会出现问题呢(这部分暂时还没加进去测试),各位兄台指点。
我将我的代码列出来,大家帮忙分析一下,看问题出在那,这种思路是否可行??
单链表类:
unit cLinkListu;
interface
uses Classes;
type
PWmisNode = ^TWmisNode;
//单链表中的一个节点
TWmisNode = packed record
nNext : PWmisNode;
nData : string;
end;
TDisposeProc = procedure(aData :string);
type
TWmisLinkList=class(TObject)
private
FHead : PWmisNode;
FSecond : PWmisNode;
FCount : longint;
FCursorIx: Longint;
FDispose: TDisposeProc;
FLast: PWmisNode;
//指向尾节点
public
constructor Create(ADispose:TDisposeProc);
destructor Destroy;
override;
procedure Append(aItem:string);
function DeleteFirst:string;
//获取数据域中的数据
function ShowList:string;virtual;
//输出链表的所有数据域
procedure Clear;
//清除链表中的节点
function Last:string;
property Count :longint read FCount;
property CursorIx:integer read FCursorIx;
end;
var
MyList:TWmisLinkList;
implementation
{初始化链表}
constructor TWmisLinkList.Create(ADispose:TDisposeProc);
begin
inherited create;
FHead:=nil;
new(FHead);
FHead^.nNext:=nil;
//FHead^.nData:='mytest';
FSecond:=nil;
FLast:=nil;
FCount:=0;
FCursorIx:=-1;
FDispose:=ADispose;
end;

{释放链表}
destructor TWmisLinkList.Destroy;
begin
Clear;
//删除除头节点外的所有节点;
Dispose(FHead);
//删除头节点
inherited Destroy;
end;

{删除除头节点外的所有节点}
procedure TWmisLinkList.Clear;
var
Temp:PWmisNode;
begin
Temp:=FHead^.nNext;
while Temp<>nildo
begin
FHead^.nNext:=Temp^.nNext;
if Assigned(FDispose) then
FDispose(Temp^.nData);
Dispose(Temp);
Temp:=FHead^.nNext;
end;
FSecond:=Temp;
FLast:=FSecond;
FCount:=0;
end;

{在链表尾部追加元素}
procedure TWmisLinkList.Append(aItem:string);
var
NewNode:PWmisNode;
begin
if FCount>$FFFE then
exit;
new(NewNode);
//首先分配一个节点
NewNode^.nData:=aItem;
NewNode^.nNext:=nil;
if FLast=nil then
begin
FSecond:=NewNode;
FLast:=FSecond;
FHead^.nNext:=FSecond;
end
else
begin
FLast^.nNext:=NewNode;
FLast:=NewNode;
end;
inc(FCount);
//元素数量加1
end;

{返回次节点的数据,并删除次节点}
function TWmisLinkList.DeleteFirst;
//获取数据域中的数据
begin
if FSecond<>nil then
begin
result:=FSecond^.nData;
if Assigned(FDispose) then
FDispose(FSecond^.nData);
FHead^.nNext:=FSecond^.nNext;
Dispose(FSecond);
FSecond:=FHead^.nNext;
dec(FCount);
if FCount=0 then
FLast:=FSecond;
end
else
result:='';
end;

{显示链表中的所有元素}
function TWmisLinkList.ShowList;
//输出链表的所有数据域
var
CurNode:PWmisNode;
begin
result:='';
CurNode:=FHead^.nNext;
while CurNode<>nildo
begin
result:=result+CurNode^.nData;
CurNode:=CurNode^.nNext;
inc(FCursorIx);
end;
end;

{返回最后一条记录}
function TWmisLinkList.Last;
begin
if FLast=nil then
begin
result:='';exit;
end;
result:=FLast^.nData;
end;
处理线程类:
type
TParseThread = class(TThread)
private
FLock: TcriticalSection;
FInfo: string;
FInd: integer;
procedure Lock;
procedure Unlock;
procedure FetchLinkList;
procedure DissociateInfoPackage;
protected
procedure Execute;
override;
public
constructor Create(inde:integer);virtual;
destructor Destroy;override;
end;
var
MyParseThread1:TParseThread;


MyParseThread36:TParseThread;
constructor TParseThread.Create(inde:integer);
begin
inherited Create(false);
FLock := TCriticalSection.Create;
FInfo :='';
end;

destructor TParseThread.Destroy;
begin
FLock.Free;
inherited Destroy;
end;

procedure TParseThread.Execute;
begin
while not Terminateddo
begin
FetchLinkList;
DissociateInfoPackage;
sleep(2);
end;
end;

{加锁}
procedure TParseThread.Lock;
begin
FLock.Enter;
end;

{解锁}
procedure TParseThread.Unlock;
begin
FLock.Leave;
end;

{===================================}
{从链表中获取数据 }
{同步链表的操作 }
{===================================}
procedure TParseThread.FetchLinkList;
begin
Lock;
try
FInfo:=MyList.DeleteFirst;
finally
if FInfo<>'' then
Form1.StatusBar.Panels[3].Text:=inttostr(MyList.Count);
Unlock;
end;
end;

{拆开链表中取出的信息包}
procedure TParseThread.DissociateInfoPackage;
begin
if FInfo<>'' then
begin
lock;
try
//这里放入解析数据相关代码,解析完毕后,送界面显示、
//同时通过从数据库连接池中取出一个连接,来往数据库里面插入数据(多线程同时访问一个表会不会出问题,
//如果不太用这种方式的话,数据库连接池也就没有实际意义了,我的应用中客户端不是很多,请各位兄台指正)
finally finally
Unlock;
end;
sleep(370);
FInfo:='';
end;
end;
敬侯各位dfw高手,各位兄台帮忙阿

 
我苦等了一上午了,dfw高手们发发言嘛,实在是着急阿
 
你采用单链表的数据恐怕有点不足以应付你的需要。你有多个处理线程,它门对单链表的现场保护就不好搞。
我觉得你应该开一个大(可以是定长或变长的)的缓冲区(可以是个2维数组),收到的东西不进行处理,一律先放入缓冲区,这样可以保证不丢包。
此外,你有关有1(若多个的话就要有解决临界资源访问冲突的机制)个缓冲区整理的线程,按照通讯协议对缓冲区里的包进行整理。
这样做的好处是:有很高的可靠性,不会丢包,即使在瞬间有大量的包涌入,也能够先收下来,以后在分析处理;坏处是:实时性弱。
 
to 沙隆巴斯的主人
感谢你的回答,如果采用数组的方式,最开始我也是这样想的,不同的是我采用的只是1维数组,后来发现采用这种方式,当数据源往数组中添加数据的时候,线程去去数据,完毕后需要删除数据,这样又存在一个数组中的数据往前或往后整体移动的过程,比较耗时。所以后来才想到用链表的方式。再加上采用动态数组,动态分配内存,其资源的释放问题好像不好把握。
不知道,你怎样看这个问题?
 
来自:hfghfghfg, 时间:2004-2-5 8:19:00, ID:2436816 | 编辑
to qince;
我 认为 server 写 到 这样 就 可以 了。{考虑 到 还有 网络 上 的 瓶颈}
我 算 过, 一秒 中 产生 的 数据 有 一兆。
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, DB, ADODB, Provider, DBClient, StdCtrls, Spin;
const
MaxRecordCount = 300;
type
TDoThread = class(TThread)
private
FDataSet: TDataSet;
FMemo: TMemo;
FDoCount: integer;
FFilter: string;
FMemoName: string;
procedure getData;
protected
procedure Execute;
override;
public
constructor Create(DataSet: TDataSet;
DoCount: Integer;
m: TMemo;
MemoName: string);
end;

type
TForm1 = class(TForm)
ADODataSet1: TADODataSet;
ClientDataSet1: TClientDataSet;
DataSetProvider1: TDataSetProvider;
Button1: TButton;
GroupBox1: TGroupBox;
Memo1: TMemo;
GroupBox2: TGroupBox;
Memo2: TMemo;
Button2: TButton;
SpinEdit1: TSpinEdit;
SpinEdit2: TSpinEdit;
Label1: TLabel;
Label2: TLabel;
procedure FormCreate(Sender: TObject);
procedure Button1Click(Sender: TObject);
procedure Button2Click(Sender: TObject);
private
public
{ Public declarations }
end;

var
Form1: TForm1;
implementation
{$R *.dfm}
{ TDoThread }
constructor TDoThread.Create(DataSet: TDataSet;
DoCount: Integer;
m: TMemo;
MemoName: string);
begin
FDataSet := DataSet;
FMemo := m;
FMemoName := MemoName;
FDoCount :=do
Count;
FreeOnTerminate := True;
inherited Create(False);
Priority := tpLower;
end;

procedure TDoThread.Execute;
var
i: integer;
begin
for i := 1 to FDoCountdo
begin
FFilter := 'F1 <> ''999'' ';
Synchronize(GetData);
end;
end;

procedure TDoThread.getData;
var
ss: TMemoryStream;
strData, SendData: string;
nowCount, i, j: integer;
begin
//
nowCount := random(MaxRecordCount);
FDataSet.First;
ss := TMemoryStream.Create;
try
FDataSet.Filter := FFilter;
FDataSet.Filtered := true;
with FDataSetdo
begin
for i := 1 to nowCountdo
if eof then
break
else
begin
for j := 0 to 9do
begin
strData := Fields[j].AsString;
ss.Write(strData[1], length(strData));
end;
next;
end;
end;
except
;
end;
setlength(SendData, ss.size);
ss.Read(SendData[1], ss.Size);
if Fmemo.Lines.Count > 10000 then
Fmemo.Lines.Clear;
Fmemo.Lines.Add(format('%s 线称%s: 记录数%d length(SendData)%d', [formatdatetime('hh:nn:ss', now), FMemoName, nowCount, length(SendData)]));
freeandnil(ss);
SendData := '';
strData := '';
Application.ProcessMessages;
end;

procedure TForm1.FormCreate(Sender: TObject);
var
i: integer;
begin
Randomize;
for i := 1 to 10do
ADODataSet1.FieldDefs.Add('F' + inttostr(i), ftstring, 50);
ADODataSet1.CreateDataSet;
ADODataSet1.Open;
for i := 1 to 10000do
ADODataSet1.AppendRecord([i, 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaaaaaaaa', 'aaaaaaaaaaaaaaaa']);
ADODataSet1.First;
ClientDataSet1.Open;
// ADODataSet1.Close;
// showmessage(inttostr(ClientDataSet1.RecordCount));
end;

procedure TForm1.Button1Click(Sender: TObject);
var
i: integer;
begin
for i := 1 to SpinEdit1.Valuedo
begin
TDoThread.Create(ClientDataSet1, { random(10000) +} SpinEdit2.Value, memo1, inttostr(i));
end;
end;

procedure TForm1.Button2Click(Sender: TObject);
var
i: integer;
begin
for i := 1 to SpinEdit1.Valuedo
begin
TDoThread.Create(ADODataSet1, { random(10000) +} SpinEdit2.Value, memo2, inttostr(i));
end;

end;

end.

object Form1: TForm1
Left = 99
Top = 210
BorderStyle = bsSingle
Caption = 'Form1'
ClientHeight = 453
ClientWidth = 816
Color = clBtnFace
Font.Charset = GB2312_CHARSET
Font.Color = clWindowText
Font.Height = -12
Font.Name = '宋体'
Font.Style = []
OldCreateOrder = False
Position = poScreenCenter
OnCreate = FormCreate
PixelsPerInch = 96
TextHeight = 12
object Label1: TLabel
Left = 351
Top = 8
Width = 36
Height = 12
Caption = '线称数'
end
object Label2: TLabel
Left = 351
Top = 136
Width = 96
Height = 12
Caption = '每个线称的工作量'
end
object Button1: TButton
Left = 375
Top = 216
Width = 75
Height = 25
Caption = 'c'
TabOrder = 0
OnClick = Button1Click
end
object GroupBox1: TGroupBox
Left = 0
Top = 0
Width = 340
Height = 453
Align = alLeft
Caption = 'ClientDataSet1'
TabOrder = 1
object Memo1: TMemo
Left = 2
Top = 14
Width = 336
Height = 437
Align = alClient
TabOrder = 0
end
end
object GroupBox2: TGroupBox
Left = 476
Top = 0
Width = 340
Height = 453
Align = alRight
Caption = 'ADODataSet1'
TabOrder = 2
object Memo2: TMemo
Left = 2
Top = 14
Width = 336
Height = 437
Align = alClient
TabOrder = 0
end
end
object Button2: TButton
Left = 375
Top = 272
Width = 75
Height = 25
Caption = 'A'
TabOrder = 3
OnClick = Button2Click
end
object SpinEdit1: TSpinEdit
Left = 351
Top = 24
Width = 121
Height = 21
MaxValue = 0
MinValue = 0
TabOrder = 4
Value = 100
end
object SpinEdit2: TSpinEdit
Left = 351
Top = 152
Width = 121
Height = 21
MaxValue = 0
MinValue = 0
TabOrder = 5
Value = 100
end
object ADODataSet1: TADODataSet
Parameters = <>
Left = 472
Top = 56
end
object ClientDataSet1: TClientDataSet
Aggregates = <>
Params = <>
ProviderName = 'DataSetProvider1'
ReadOnly = True
Left = 480
Top = 184
end
object DataSetProvider1: TDataSetProvider
DataSet = ADODataSet1
Constraints = True
Left = 480
Top = 112
end
end


来自:hfghfghfg, 时间:2004-2-5 8:24:00, ID:2436822 | 编辑
一秒 中 产生 的 数据 大于 一兆。
不知 你的 网络 情况 如何???

 
数据库+同步
 
"完毕后需要删除数据,这样又存在一个数组中的数据往前或往后整体移动的过程",如果是定长数组,就不要删除,打个处理完毕的标签,下次收包线程循环到这的时候,覆盖就OK了。
完全的动长比较麻烦。可以是一维的定长数组,定长数组的没个单元指向一个动长数组的首地址。这样资源的开销也不大,而处理将方便些。
 
to hfghfghfg
我的应用中可能没秒钟有2000条数据,每条数据的长度为100个字节,由于双socket链路数接入,所以数据解析的过程相对比较繁琐,单个线程或者几个线程都处理不急,而且,相对来讲,数据的输入部分比较匀速,所以处理起来,感觉总是不爽。
解析完毕后,存入将数据存入数据库,由于都往一个历史数据表中插入数据,所以如果采用每个线程一个ado连接,会不会造成资源冲突呢。
劳驾继续高手继续指点
 
to 沙隆巴斯的主人
你说的对我启发很大,原先想的是动态数组来变长的话,资源不太方便释放,但是采用链表的方式的话,也会造成资源释放不好而造成碎片的可能性,因此也比较不好控制。
照您这么一说,我觉得就是采用定长数组在一定程度上也可以,如果监听线程发现数组中得记录数量超过一定范围时,可以从线程池里面在取一个线程出来处理,或者不够在往线程池里添加一个,当记录数缩小到一定范围时,可以挂起几个线程,同时归还给线程池
同时数组是不是可以使用循环数组呢??
还有,您刚才所说的“可以是一维的定长数组,定长数组的没个单元指向一个动长数组的首地址。这样资源的开销也不大,而处理将方便些。 ”这句我还没太明白,能不能深入点。谢谢
 
呵呵,是每个,不是没个
数组当然是要循环的,到了顶后就从新开始。
“可以是一维的定长数组,定长数组的没个单元指向一个动长数组的首地址。这样资源的开销也不大,而处理将方便些。 ”这实际上是个一维定长二维变长的二维数组。
还有就是不要迷信多线程(当然,如果是有多CPU的系统除外),多线程可以提高并发性,但在没有外部低速设备参与的情况下,多线程对性能的帮助不大。
 
to 沙隆巴斯的主人
谢谢,我懂您的意思,其实采用多线程也是不得已,主要是不采用多线程的确处理不过来。呵呵
 
有个很对本人来说不可理解的问题。有个线程处理时间大概为一秒钟,请问怎么样在同一秒启动100个相同的线程??如用
for i:=0 to 100do
begin
mytread1:=mytread.Create(false);
end
就不是同时在一秒内同时启动了
 
to :rain_2002 、沙隆巴斯的主人能否留下QQ
我的 25370574
 
很久不写多线程了,好像应该定义一个线程数组,然后再循环中create吧?
 
to junrui2726
wfzha说的也可以,不过也可以采用线程池的方式。
 
我的qq 113791053 有空大家一家讨论
 
TExcuteThread = class(TThread)
Private
FTestCount:integer;
procedure Test;
protected
procedure Execute;
override;
public
constructor Create(CreateSuspended:Boolean);
destructor Destroy;
override;
end;
var
T1:TExcuteThread;
T2:TExcuteThread;
implementation
uses unit1;
constructor TExcuteThread.Create(CreateSuspended:Boolean);
begin
inherited create(CreateSuspended);
FreeOnTerminate := True;
FTestCount:=0;
end;

{析构函数}
destructor TExcuteThread.Destroy;
begin
inherited Destroy;
end;

procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
Synchronize(Test);
//Test;
sleep(1000);
end;
end;

procedure TExcuteThread.Test;
var
s:string;
begin
s:=format('当前线程: %d, 测试数据为: %d',[GetCurrentThreadId,FTestCount]);
Form1.Memo1.Lines.Add(s);
if Form1.Memo1.Lines.Count>20 then
Form1.Memo1.Clear;
inc(FTestCount);
if FTestCount>5000 then
FTestCount:=0;
end;
采用主线程同步后t1,t2的运行结果,发现t1的GetCurrentThreadId和t2的GetCurrentThreadId相同。
如下图是采用Synchronize(Test)的执行结果
procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
Synchronize(Test);
//Test;
sleep(1000);
end;
end;


反之,不加主线程同步,t1,t2的运行结果,发相t1的GetCurrentThreadId和t2的GetCurrentThreadId不相同。
如下图是采用Test的执行结果的结果
procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
//Synchronize(Test);
Test;
sleep(1000);
end;
end;

TExcuteThread = class(TThread)
Private
FTestCount:integer;
procedure Test;
protected
procedure Execute;
override;
public
constructor Create(CreateSuspended:Boolean);
destructor Destroy;
override;
end;
var
T1:TExcuteThread;
T2:TExcuteThread;
implementation
uses unit1;
constructor TExcuteThread.Create(CreateSuspended:Boolean);
begin
inherited create(CreateSuspended);
FreeOnTerminate := True;
FTestCount:=0;
end;

{析构函数}
destructor TExcuteThread.Destroy;
begin
inherited Destroy;
end;

procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
Synchronize(Test);
//Test;
sleep(1000);
end;
end;

procedure TExcuteThread.Test;
var
s:string;
begin
s:=format('当前线程: %d, 测试数据为: %d',[GetCurrentThreadId,FTestCount]);
Form1.Memo1.Lines.Add(s);
if Form1.Memo1.Lines.Count>20 then
Form1.Memo1.Clear;
inc(FTestCount);
if FTestCount>5000 then
FTestCount:=0;
end;
采用主线程同步后t1,t2的运行结果,发现t1的GetCurrentThreadId和t2的GetCurrentThreadId相同。
如下图是采用Synchronize(Test)的执行结果
+++++++++++++++++++++++++++++++++++++
+ t1:当前线程: 456, 测试数据为: 0 +
+ t2:当前线程: 456, 测试数据为: 0 +
+++++++++++++++++++++++++++++++++++++
procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
Synchronize(Test);
//Test;
sleep(1000);
end;
end;

反之,不加主线程同步,t1,t2的运行结果,发相t1的GetCurrentThreadId和t2的GetCurrentThreadId不相同。
如下图是采用Test的执行结果的结果
procedure TExcuteThread.Execute;
begin
while not Terminateddo
begin
//Synchronize(Test);
Test;
sleep(1000);
end;
end;
++++++++++++++++++++++++++++++++++++++
+ t1:当前线程: 1460, 测试数据为: 0 +
+ t2:当前线程: 820, 测试数据为: 0 +
++++++++++++++++++++++++++++++++++++++
这是为什么呢??
 
感谢给位答复,同时奉送上一定程度的积分。到此结贴!
 
后退
顶部