是不是线程的错? (400分,等待中……) (200分)

  • 主题发起人 主题发起人 Richard3000
  • 开始时间 开始时间
R

Richard3000

Unregistered / Unconfirmed
GUEST, unregistred user!
目前的问题:
http://www.delphibbs.com/delphibbs/dispq.asp?lid=1663196
[red]一台PC以令牌的方式用RS232访问30-50台电子秤,从电子秤读来的数据必须先存到内存里,
然后再把内存中的数据写到数据库中。[/red]
[blue]我的想法是写两个线程,一个负责从电子秤读数据并保存到一个队列里,另一个线程负责把
队列中的数据写到数据库中。不知道这种做法合适吗?有没有更好的办法?[/blue]
各位大哥能不能留下MSN?我的是 mailto:zhenhuazhao@hotmail.com
目前的代码(没有一个数据能存到数据库中[:(],关闭时出错[:(!]):
unit UntMain;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs,ADODB,Contnrs, DB, StdCtrls;
const
MAX_BALANCE_COUNT=100;
//电子秤的最大数量(电子秤秤号为2位ASCII码)
MAX_UNSAVE_COUNT=2000;
//不能写入数据库的数据的最大数,大约有3分钟的数据
type
TBalance=class
private
BalanID:String[2];//秤号:2位ASCII码
SerialID:String;//流水号:5位ASCII码
Weight:Double;//重量
WriteTime:Double;//最后一次访问时间
PowerOn:Boolean;
Saved:Boolean;
public
constructor Create;
end;

TReadBalance=class(TThread)
private
ADOQuery:TADOQuery;
protected
procedure Execute;override;
procedure InitBalance;
public
constructor Create(aADOQuery:TADOQuery);
end;

TWriteBalance=class(TThread)
private
ADOQuery:TADOQuery;
function SaveToDatabase(Balance:TBalance):Boolean;
protected
procedure Execute;override;
public
constructor Create(aADOQuery:TADOQuery);
end;

TFrmMain = class(TForm)
ADOConnection1: TADOConnection;
ADOQuery1: TADOQuery;
ADOQuery2: TADOQuery;
btnStart: TButton;
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
procedure btnStartClick(Sender: TObject);
private
{ Private declarations }
public
{ Public declarations }
end;

var
FrmMain: TFrmMain;
implementation
{$R *.dfm}
var
hSem:THandle=0;
BalanceQueue:TObjectQueue;//队列,用于保存从电子秤读来的数据
Balances:array [0..MAX_BALANCE_COUNT-1] of TBalance;//电子秤数据
UnsavedCount:Integer=0;//用于记录从队列中读出数据但是写不到数据库的个数
ReadBalance:TReadBalance;
WriteBalance:TWriteBalance;
ReadCount:Integer;
constructor TBalance.Create;
begin
Saved:=False;
end;

constructor TReadBalance.Create(aADOQuery:TADOQuery);
begin
ADOQuery:=aADOQuery;
inherited Create(False);
end;

constructor TWriteBalance.Create(aADOQuery:TADOQuery);
begin
ADOQuery:=aADOQuery;
inherited Create(False);
end;

procedure TReadBalance.InitBalance;
var
Balance:TBalance;
i:Integer;
begin
try
with ADOQuerydo
begin
if Active then
Close;
SQL.Clear;
SQL.Add('SELECT BalanceID,WorkState FROM Balances');
Open;
First;
if WaitForSingleObject(hSem,INFINITE)=WAIT_OBJECT_0 then
for i:=Low(Balances) to High(Balances)do
begin
{if Balances.PowerOn then
begin
Next;
Continue;
end;
//if Balances<>nil then
}
Balance:=TBalance.Create;
Balance.PowerOn:=FieldByName('WorkState').AsBoolean;
Balance.Saved:=False;
Balance.BalanID:=FieldByName('BalanceID').AsString;
Balances:=Balance;
Next;
end;
//while not Eofdo
ReleaseSemaphore(hSem,1,nil);

end;
//with ADOQuerydo
except
Abort;
end;
end;

procedure TReadBalance.Execute;
var
Balance:TBalance;
i:Integer;
begin
FreeOnTerminate:=True;
while not Terminateddo
begin
if (ReadCount mod 10000000)=0 then
//每读10000000次,初始化一次电子秤状态
begin
ReadCount:=1;
InitBalance;
//if ReadCount>=100 then
end;

if WaitForSingleObject(hSem,INFINITE)=WAIT_OBJECT_0 then
for i:=Low(Balances) to High(Balances)do
begin
if not Balances.PowerOn then
//没有通电
Continue;
if ((Random(7) mod 2)<>0) then
//假设此时读不到数据
Continue;
Inc(ReadCount);

Balance:=TBalance.Create;
with Balancedo
begin
WriteTime:=Now;
if i<10 then
BalanID:='0'+IntToStr(i)
else
BalanID:=IntToStr(i);
//if i<10 then

SerialID:=Copy(IntToStr(GetTickCount()),1,5);
Weight:=Random(300);
end;
//with Balancedo
Balances:=Balance;
BalanceQueue.Push(Balance);
//压入链表当中
Inc(ReadCount);
end;
ReleaseSemaphore(hSem,1,nil);
end;
end;

function TWriteBalance.SaveToDatabase(Balance:TBalance):Boolean;
begin
try
with ADOQuerydo
begin
if Active then
Close;
SQL.Clear;
SQL.Add('INSERT INTO SaveData (BalanceID,Serial,WriteTime)');
SQL.Add(' VALUES (:BalanceID,:Serial,:WriteTime)');
Parameters.ParamByName('BalanceID').Value:=Balance.BalanID;
Parameters.ParamByName('Serial').Value:=Balance.SerialID;
Parameters.ParamByName('WriteTime').Value:=Balance.WriteTime;
ExecSQL;
end;
Result:=True;
except
Result:=False;
end;
end;

procedure TWriteBalance.Execute;
var
Balance:TBalance;
Index:Integer;
begin
FreeOnTerminate:=True;
while not Terminateddo
begin
if BalanceQueue.Count>0 then
//队列有数据
begin
if WaitForSingleObject(hSem,INFINITE)=WAIT_OBJECT_0 then
begin
Balance:=TBalance(BalanceQueue.Pop);
if SaveToDatabase(Balance) then
begin
Index:=StrToInt(Balance.BalanID);
Balances[Index].Saved:=True;
end else
begin
BalanceQueue.Push(Balance);
Inc(UnsavedCount);
end;
//if SaveToDatabase(Balance) then
end;
//if WaitForSingleObject(hSem,INFINITE)=WAIT_OBJECT_0 then
ReleaseSemaphore(hSem,1,nil);
end;
end;
end;

procedure TFrmMain.FormCreate(Sender: TObject);
begin
hSem:=CreateSemaphore(nil,0,3,nil);//[red]hSem:=CreateSemaphore(nil,3,30,nil);时有12条数据被保存[/red]
BalanceQueue:=TObjectQueue.Create;
end;

procedure TFrmMain.FormDestroy(Sender: TObject);
begin
BalanceQueue.Free;
CloseHandle(hSem);
end;

procedure TFrmMain.btnStartClick(Sender: TObject);
begin
ReadBalance:=TReadBalance.Create(ADOQuery1);
WriteBalance:=TWriteBalance.Create(ADOQuery2);
end;

end.

 
在线程里用ADO要初始化先,CoInit******
 
嗯,嗯,用完别忘记了UNCOINITXXXXXXXX
 
同意无忌兄的意见,VCL组件不支持多线程,但可以使用COINIT*****(命令记不清了),实现。
但必须在USES中导入WINDOW!!
 
或者使用SYCHOR*****()
 
williame兄、无忌兄:可不可以具体点?
 
信号量的用法不太对,两个线程同步需要两个信号量
访问公共资源,需要互斥访问
 
用临界区就够了,呵呵
 
用临界区是可以做到互斥访问共享资源,
但是两个线程序要不停的查询缓冲区状态,资源占用率较高
用信号量便可以实现自动等待,等待时不占用CPU资源
 
其实是个生产与销售的经典多线程临界区信号量管理算法问题,
任何一本介绍线程的书都会介绍的.
 
一个同步对象工具单元:
unit USync;
interface
uses
SysUtils, Classes, Windows;
type
TSync = class(TObject)
protected
FHandle: THandle;
public
destructor Destroy;
override;
property Handle: THandle read FHandle;
end;

TMutex = class(TSync)
public
constructor Create(const Name: string);
function Get(TimeOut: Integer): Boolean;
function Release: Boolean;
end;

TEvent = class(TSync)
public
constructor Create(const Name: string;
Manual: Boolean);
function Wait(TimeOut: Integer): Boolean;
procedure Signal;
procedure Reset;
end;

TSemaphore = class(TSync)
public
constructor Create(const Name: string;
Initial, Maxinum: Integer);
function Release(Count: Integer;
var PreviousCount: Integer): Boolean;
overload;
function Release(Count: Integer): Boolean;
overload;
function Get(TimeOut: Integer): Boolean;
end;

TFileMap = class(TSync)
private
FCreated: Boolean;
FSize: Integer;
FFileView: Pointer;
FName: string;
public
constructor Create(const Name: string;
Size: Integer);
constructor Open(const Name: string);
destructor Destroy;
override;
property Name: string read FName;
property Size: Integer read FSize;
property Buffer: Pointer read FFileView;
property Created: Boolean read FCreated;
end;

TMemStream = class(TCustomMemoryStream)
private
Owned: Boolean;
public
destructor Destroy;
override;
procedure Clear;
procedure Attach(PBuff: Pointer;
BuffSize: Integer);
procedure AttachOwnFree(PBuff: Pointer;
BuffSize: Integer);
procedure LoadFromStream(Stream: TStream);
function Write(const Buffer;
Count: Longint): Longint;
override;
function WriteString(const S: String): Longint;
function ReadString(const Count: Integer): String;
end;

implementation
procedure Error(const Msg: string);
begin
raise Exception.Create(Msg);
end;

{ TSync }
destructor TSync.Destroy;
begin
if FHandle <> 0 then
CloseHandle(FHandle);
end;

{ TMutex }
constructor TMutex.Create(const Name: string);
begin
FHandle := CreateMutex(nil, False, PChar(Name));
if FHandle = 0 then
abort;
end;

function TMutex.Get(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

function TMutex.Release: Boolean;
begin
Result := ReleaseMutex(FHandle);
end;

{ TEvent }
constructor TEvent.Create(const Name: string;
Manual: Boolean);
begin
FHandle := CreateEvent(nil, Manual, False, PChar(Name));
if FHandle = 0 then
abort;
end;

function TEvent.Wait(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

procedure TEvent.Signal;
begin
SetEvent(FHandle);
end;

procedure TEvent.Reset;
begin
ResetEvent(FHandle);
end;

{ TSemaphore }
constructor TSemaphore.Create(const Name: string;
Initial,
Maxinum: Integer);
begin
FHandle := CreateSemaphore(nil, Initial, Maxinum, PChar(Name));
if FHandle = 0 then
abort;
end;

function TSemaphore.Get(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

function TSemaphore.Release(Count: Integer): Boolean;
begin
Result := ReleaseSemaphore(Handle, Count, nil);
end;

function TSemaphore.Release(Count: Integer;
var PreviousCount: Integer): Boolean;
begin
Result := ReleaseSemaphore(Handle, Count, @PreviousCount);
end;

{ TFileMap }
constructor TFileMap.Create(const Name: string;
Size: Integer);
begin
try
FName := Name;
FSize := Size;
{ CreateFileMapping, when called with $FFFFFFFF for the hanlde value,
creates a region of shared memory }
FHandle := CreateFileMapping($FFFFFFFF, nil, PAGE_READWRITE, 0,
Size, PChar(Name));
if FHandle = 0 then
abort;
FCreated := GetLastError = 0;
{ We still need to map a pointer to the handle of the shared memory region }
FFileView := MapViewOfFile(FHandle, FILE_MAP_WRITE, 0, 0, Size);
if FFileView = nil then
abort;
except
Error(Format('创建内存映射文件失败, %s (%d)', [Name, GetLastError]));
end;
end;

destructor TFileMap.Destroy;
begin
if FFileView <> nil then
UnmapViewOfFile(FFileView);
inherited;
end;

constructor TFileMap.Open(const Name: string);
begin
try
FName := Name;
FSize := -1;
{ CreateFileMapping, when called with $FFFFFFFF for the hanlde value,
creates a region of shared memory }
FHandle := OpenFileMapping(0, True, PChar(Name));
if FHandle = 0 then
abort;
FCreated := GetLastError = 0;
{ We still need to map a pointer to the handle of the shared memory region }
FFileView := MapViewOfFile(FHandle, FILE_MAP_WRITE, 0, 0, Size);
if FFileView = nil then
abort;
except
Error(Format('创建内存映射文件失败, %s (%d)', [Name, GetLastError]));
end;
end;

{ TMemStream }
procedure TMemStream.Attach(PBuff: Pointer;
BuffSize: Integer);
begin
Clear;
SetPointer(PBuff, BuffSize);
Owned := False;
end;

procedure TMemStream.AttachOwnFree(PBuff: Pointer;
BuffSize: Integer);
begin
Clear;
SetPointer(PBuff, BuffSize);
Owned := True;
end;

procedure TMemStream.Clear;
begin
if Owned then
FreeMem(Memory);
end;

destructor TMemStream.Destroy;
begin
Clear;
inherited;
end;

procedure TMemStream.LoadFromStream(Stream: TStream);
var
Count: Longint;
begin
Stream.Position := 0;
Count := Size;
if Count <> 0 then
Stream.ReadBuffer(Memory^, Count);
end;

function TMemStream.ReadString(const Count: Integer): String;
var
ts: String;
Len: Integer;
begin
if Count > 0 then
begin
SetLength(ts, Count);
Len := Read(ts[1], Count);
Result := Copy(ts, 1, Len);
end else
Result := '';
end;

function TMemStream.Write(const Buffer;
Count: Integer): Longint;
var
Pos: Longint;
Num: Longint;
begin
if (Position >= 0) and (Count >= 0) then
begin
Pos := Position + Count;
if Pos > 0 then
begin
if Pos > Size then
Pos := Size;
Num := Pos - Position;
if Num > 0 then
begin
System.Move(Buffer, Pointer(Longint(Memory) + Position)^, Num);
Position := Pos;
Result := Num;
Exit;
end;
end;
end;
Result := 0;
end;

function TMemStream.WriteString(const S: String): Longint;
begin
if Length(S) > 0 then
Result := Write(S[1], Length(S))
else
Result := 0;
end;

end.
 
to Richard3000,
我的程序稍加改动就可以满足你的需要,
写操作可以在主线程里完成,因为他很快,不需要等待的
下面是我的程序
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, USync, SyncObjs, Contnrs;
type
TBalance=record
BalanID:String[2];//秤号:2位ASCII码
SerialID:String;//流水号:5位ASCII码
Weight:Double;//重量
WriteTime:Double;//最后一次访问时间
//后面的变量就没用了
isBusy:Boolean;
end;

TBuffers = array of TBalance;
TDataProcessor = class(TThread)
private
{ Private declarations }
protected
procedure Execute;
override;
public
SyncEmpty: TSemaphore;
SyncFull: TSemaphore;
MutexAccess: TCriticalSection;
Queue: TQueue;
FreeBuff: TStack;
Buffers: TBuffers;
//用于显示数据
ListBox: TListBox;
vMsg: String;
procedure DispMsg;
end;

TForm1 = class(TForm)
Button1: TButton;
ListBox1: TListBox;
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
procedure Button1Click(Sender: TObject);
private
{ Private declarations }
public
SyncEmpty: TSemaphore;
SyncFull: TSemaphore;
MutexAccess: TCriticalSection;
Queue: TQueue;
FreeBuff: TStack;
Buffers: TBuffers;
Worker: TDataProcessor;
end;

var
Form1: TForm1;
implementation
{$R *.dfm}
{ TDataProcessor }
procedure TDataProcessor.DispMsg;
begin
ListBox.Items.Add(vMsg);
end;

procedure TDataProcessor.Execute;
var
n: Integer;
t: TBalance;
begin
while not Terminateddo
begin
//等待缓冲区中加入数据,类型转换仅仅是去掉编译警告
SyncFull.Get(Integer(INFINITE));
//无限等待
//为了使线程在程序退出时能够正常退出
if Terminated then
Exit;
//锁定临界区
MutexAccess.Enter;
//此时已经等到了,从队列中取数据索引
n := Integer(Queue.Pop);
//此时处理数据,为了不妨碍其它线程及时访问全局资源
//应当及早的解锁,否则主线程将被锁定,采用的方法是
//复制到临时变量,马上解锁,注意,锁定的时间应尽可能的短
t := Buffers[n];
//登记为空闲,入空闲栈
FreeBuff.Push(Ptr(n));
//立刻临界区
MutexAccess.Leave;
//空闲空间加1,如果其他线程在等待空间,则会被唤醒,继续执行
SyncEmpty.Release(1);
//此时可以对数据进行处理,此处仅仅显示数据
//你可以进行入库或其他耗时的数据处理
vMsg := t.BalanID + ',' + t.SerialID + ' ' +
Format('%9.2f', [t.Weight]) + ' ' +
'Time: ' + DateTimeToStr(t.WriteTime);
//访问主线程的VCL对象时必须这样,如果访问数据库,
//则必须有一个线程安全的数据库链接,如果是BDE,则必须使用独立的会话
Sleep(1000);
//为了模仿某种慢速操作,延时1秒钟
//但是,有一点需要记住: 后面这句代码是通过主线程完成的
//如果主线程被阻塞,那就意味着 死锁 ,程序将完全锁死
//但对于不掉用Synchronize的其他操作无妨
Synchronize(DispMsg);
//此处处理完毕,进入下一次循环,所以的等待和同步都是自动进行的
//这都要归功于操作系统提供的互斥和同步对象
end;
end;

procedure TForm1.FormCreate(Sender: TObject);
var
i: Integer;
//s: String;
begin
//创建同步用的信号量
SyncEmpty := TSemaphore.Create('SyncEmpty002', 100, 100);
//假设有100个缓冲空间
SyncFull := TSemaphore.Create('SyncEmpty108', 0, 100);
//注意,两个信号量名字不能相同
//也可以这样创建
//UniqueString(s);
//生成不重复字符串
//SyncEmpty := TSemaphore.Create(s, 100, 100);
//假设有100个缓冲空间
//UniqueString(s);
//SyncFull := TSemaphore.Create(s, 0, 100);
//注意,两个信号量名字不能相同
//创建临界区
MutexAccess := TCriticalSection.Create;
//创建队列
Queue := TQueue.Create;
//创建空闲空间栈,自行进行内存分配
FreeBuff := TStack.Create;
//分配足够的动态空间
SetLength(Buffers, 120 + 1);
//降所有空间入栈,空闲一个首位置
for i := 120do
wnto 1do
FreeBuff.Push(Ptr(i));
//创建线程对象
Worker := TDataProcessor.Create(True);
//初始化线程变量
Worker.SyncEmpty := SyncEmpty;
Worker.SyncFull := SyncFull;
Worker.MutexAccess := MutexAccess;
Worker.Queue := Queue;
Worker.FreeBuff := FreeBuff;
//动态数组的赋值不会引起分配内存
//而是两个变量引用同一数组
Worker.Buffers := Buffers;
//关联显示对象
Worker.ListBox := ListBox1;

Worker.Resume;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
//一定要先释放工作线程
//正常终止线程
Worker.Terminate;
//如果线程在等待,唤醒线程,并退出线程
SyncFull.Release(1);
//释放县城对象
Worker.Free;
SyncEmpty.Free;
SyncFull.Free;
MutexAccess.Free;
Queue.Free;
FreeBuff.Free;
SetLength(Buffers, 0);
end;

procedure TForm1.Button1Click(Sender: TObject);
var
n: Integer;
begin
//向缓冲区添加一个数据
SyncEmpty.Get(Integer(INFINITE));
//无限等待
//进入互斥访问临界区
MutexAccess.Enter;
//获得一个空闲内存块
n := Integer(FreeBuff.Pop);
//进行赋值
Buffers[n].BalanID := Format('%.2d', [Random(100)]);
Buffers[n].SerialID := Format('%.5d', [Random(100000)]);
Buffers[n].Weight := Random * 100;
Buffers[n].WriteTime := Now;
Queue.Push(Ptr(n));
//注意: 对公共对象的访问(非VCL组件或控件)必须在临界区中访问
//并尽量使访问的时间(锁定的时间)最短
//离开临界区
MutexAccess.Leave;
//通知工作线程
//上面可一次加入多个元素,但是保证SyncFull不会出界
SyncFull.Release(1);
end;

end.
 
我的方法稳定可靠,已通过测试
注意,访问ADO时,创建单独的数据库连接
 
李兄:
  你的例子我看了,但是我原来没有接触过线程,不太懂!现在在看书,不过我的程序也
等着要用![:(][:(][:(]
  访问ADO时,创建单独的数据库连接。是不是要动态创建一个TADOConnection?
  我没有用数组,而是TObjectQueue来保存从电子秤读来的数据。读、写TObjectQueue时
该怎么用信号量或者临界区控制?
 
简单说一下消费者和生产者的问题
同步和互斥需要两个信号量和一个互斥量(在同一个进程中用临界区更好)
s1,s2表示两个信号量,信号量是一个计数器,对于信号量有两种操作
即p操作 和 v操作 在我的程序中对应为 Get 和 Release
前者执行时会将计数器减一,但当计数器为0时,此操作将被操作系统挂起
直到此计数被别人加一(另一个线程调用此信号量的Release方法),则被唤醒
两个线程实际上是互相阻塞互相唤醒,来配合工作的
同时信号量还可以起到缓冲区计数的作用
两个线程访问共享资源时,必须互斥访问,临界区的进入和离开必须在最内层
即必须在同步操作的更内层
你的队列属于共享资源,所以必须锁定后才能访问
 
比较的来说,如果是一个进程内的同步,用临界区就行了,用互斥比较消耗资源
不信你可以做一下测试,用信号更加消耗资源,信号量控制同时操作同一资源的
线程数量...
不过我仔细想了想,用事件Event不错,对这个应用和互斥比较适合,尤其是事件
 
无忌兄,上MSN吧。我快急死了!
 
李兄:
  为什么“注意,访问ADO时,创建单独的数据库连接”?是不是要动态创建一个TADOConnection?
为什么?
  谢谢!
 
多谢各位大侠的帮忙!
现在问题已经差不多解决了,只有一点CloseHandle(hMutex)时出错(内存地址错)。不知道
这是为什么?希望大侠们继续帮忙!
 

Similar threads

后退
顶部