多线程,共享结构数组问题!在线等待...高分相送(100分)

  • 主题发起人 主题发起人 syy_sxd
  • 开始时间 开始时间
S

syy_sxd

Unregistered / Unconfirmed
GUEST, unregistred user!
TRecvInfo = packed record
Total_Length : integer;
Command_ID : integer;
Sequence_ID : integer;
Msg_Id : int64;
Destnation_id : array [0..21 - 1] of Char;
Service_id : array [0..10 - 1] of Char;
TP_pid : byte;
TP_udhi : byte;
Msg_Fmt : byte;
Srcterminal_id : array [0..21 - 1] of Char;
Registered_Delivery : byte;
Msg_length : byte;
Msg_Content : array [0..160 - 1] of Char;
Reserve : array [0..8 - 1] of Char;
flag:byte;//是否被处理的标志
end;
主线程,定义一结构型数组
RecvMem:array[1..100] of TRecvInfo;
主线程用数组来报存接收的数据,然后经过数据库判断等线程处理后,用发送线程来发送
请问,1.接收的快,处理的慢,怎么设计呢
2.主线程,处理线程共享结构数组,怎么解决!
 
你的想法很好,也是对的
要用临界处理,防止数组同时被两个线程写!
临界的用法看一下帮助,有例子的
 
使用信号量和互斥量来同步访问,
这两个东西就是用来做这个的,可以提高效率,减小资源占用
 
接收的快,处理的慢,怎么设计呢?
用互斥可以实现同步,可要等一个线程把数租添慢,另外一个线程才可以处理,
这样和单线程没有区别啊。怎样才能实现多个线程同步处理数组呢
 
请帮忙看看代码,为何接收线程只运行一次呢,我把代码给简化了
unit MainUnit;
type
TMainForm = class(TForm)
const
Server:string[15]='192.168.0.7';

var
MainForm: TMainForm;
hMutex1:THandle=0;
implementation
uses Data, SendThds, RecvThds;
procedure TMainForm.BitBtn1Click(Sender: TObject);
begin

hMutex2:=CreateMutex(nil,false,nil);
TRecvThread.Create(DataModule1.ADOQuery1,RecInfo^);

end;

procedure TMainForm.FormCreate(Sender: TObject);
begin
new(RecInfo);
end;
procedure TMainForm.ClientSocketRead(Sender: TObject;
Socket: TCustomWinSocket);
begin
RecInfo2:=Pointer(MainForm.ServerSocket1.socket.ReceiveText);
end;

接收线程
type
TRecvInfo = packed record
Total_Length : integer;
Command_ID : integer;
Sequence_ID : integer;
Msg_Id : int64;
Destnation_id : array [0..21 - 1] of Char;
Service_id : array [0..10 - 1] of Char;
TP_pid : byte;
TP_udhi : byte;
Msg_Fmt : byte;
Srcterminal_id : array [0..21 - 1] of Char;
Registered_Delivery : byte;
Msg_length : byte;
Msg_Content : array [0..160 - 1] of Char;
Reserve : array [0..8 - 1] of Char;
flag:byte;//是否被处理的标志 1处理完,0为处理
end;
type
TRecvThread = class(TThread)
private
{ Private declarations }
ThdsAdoQuery: TAdoQuery;
protected
procedure Execute;
override;
proceduredo
ne;
public
constructor Create(adoquery:TAdoQuery;RecInfo1:TRecvInfo);
destructor Destroy;
override;
end;

const
MaxSize=10;
var
NextNumber:integer=1;
RecvMem:array[1..MaxSize] of TRecvInfo;
RecInfo,RecInfo2:^TRecvInfo;
implementation
uses MainUnit;
{ Important: Methods and properties of objects in VCL or CLX can only be used
in a method called using Synchronize, for example,
Synchronize(UpdateCaption);
and UpdateCaption could look like,
procedure RecvThds.UpdateCaption;
begin
Form1.Caption := 'Updated in a thread';
end;
}
{ RecvThds }
destructor TRecvThread.Destroy;
begin
{dispose(fptr_msg_head);
dispose(f_submit);
dispose(f_connect);
form1.recreate;
}
CloseHandle(hMutex2);
end;
constructor TRecvThread.Create(adoquery:TAdoQuery;RecInfo1:TRecvInfo);
begin
inherited Create(false);
RecInfo:=@RecInfo1;
ThdsAdoQuery:=adoquery;
//Resume;
end;
procedure TRecvThread.Done;
begin
if (RecvMem[NextNumber].flag=0) and (RecvMem[nextnumber].Msg_Content<>'') then
begin
RecvMem[NextNumber].Total_Length:=RecInfo^.Total_Length;
RecvMem[NextNumber].Command_ID:=RecInfo^.Command_ID;
RecvMem[NextNumber].Sequence_ID:=RecInfo^.Sequence_ID;
RecvMem[NextNumber].msg_id:=RecInfo^.msg_id;
RecvMem[NextNumber].Destnation_id:=RecInfo^.Destnation_id;
RecvMem[NextNumber].Service_id:=RecInfo^.Service_id;
RecvMem[NextNumber].TP_pid:=RecInfo^.TP_pid;
RecvMem[NextNumber].TP_udhi:=RecInfo^.TP_udhi;
RecvMem[NextNumber].Msg_Fmt:=RecInfo^.Msg_Fmt;
RecvMem[NextNumber].Srcterminal_id:=RecInfo^.Srcterminal_id;
RecvMem[NextNumber].Registered_Delivery:=RecInfo^.Registered_Delivery;
RecvMem[NextNumber].Msg_length:=RecInfo^.Msg_length;
RecvMem[NextNumber].Msg_Content:=RecInfo^.Msg_Content;
RecvMem[NextNumber].Reserve:=RecInfo^.Reserve;
RecvMem[NextNumber].flag:=1;
if NextNumber=MaxSize then
begin
NextNumber:=0;
end;
Inc(NextNumber);
end;
end;
procedure TRecvThread.Execute;
begin
{ Place thread code here }
FreeOnTerminate:=True;
if WaitForSingleObject(hMutex2,INFINITE)=WAIT_OBJECT_0 then
begin
synchronize(Done);
end;
ReleaseMutex(hMutex2);
end;

end.
 
了解一下信号量的概念吧
生产者和消费者的经典问题
http://www.glcat.edu.cn/bmzy/dept2/czxt/3.htm
 
一个进程间同步和通信的工具单元,改进的,
同样可以用于线程
unit USync;
interface
uses
SysUtils, Classes, Windows;
type
TSync = class(TObject)
protected
FHandle: THandle;
public
destructor Destroy;
override;
property Handle: THandle read FHandle;
end;

TMutex = class(TSync)
public
constructor Create(const Name: string);
function Get(TimeOut: Integer): Boolean;
function Release: Boolean;
end;

TEvent = class(TSync)
public
constructor Create(const Name: string;
Manual: Boolean);
function Wait(TimeOut: Integer): Boolean;
procedure Signal;
procedure Reset;
end;

TSemaphore = class(TSync)
public
constructor Create(const Name: string;
Initial, Maxinum: Integer);
function Release(Count: Integer;
var PreviousCount: Integer): Boolean;
overload;
function Release(Count: Integer): Boolean;
overload;
function Get(TimeOut: Integer): Boolean;
end;

TFileMap = class(TSync)
private
FCreated: Boolean;
FSize: Integer;
FFileView: Pointer;
FName: string;
public
constructor Create(const Name: string;
Size: Integer);
constructor Open(const Name: string);
destructor Destroy;
override;
property Name: string read FName;
property Size: Integer read FSize;
property Buffer: Pointer read FFileView;
property Created: Boolean read FCreated;
end;

TMemStream = class(TCustomMemoryStream)
private
Owned: Boolean;
public
destructor Destroy;
override;
procedure Clear;
procedure Attach(PBuff: Pointer;
BuffSize: Integer);
procedure AttachOwnFree(PBuff: Pointer;
BuffSize: Integer);
procedure LoadFromStream(Stream: TStream);
function Write(const Buffer;
Count: Longint): Longint;
override;
function WriteString(const S: String): Longint;
function ReadString(const Count: Integer): String;
end;

implementation
procedure Error(const Msg: string);
begin
raise Exception.Create(Msg);
end;

{ TSync }
destructor TSync.Destroy;
begin
if FHandle <> 0 then
CloseHandle(FHandle);
end;

{ TMutex }
constructor TMutex.Create(const Name: string);
begin
FHandle := CreateMutex(nil, False, PChar(Name));
if FHandle = 0 then
abort;
end;

function TMutex.Get(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

function TMutex.Release: Boolean;
begin
Result := ReleaseMutex(FHandle);
end;

{ TEvent }
constructor TEvent.Create(const Name: string;
Manual: Boolean);
begin
FHandle := CreateEvent(nil, Manual, False, PChar(Name));
if FHandle = 0 then
abort;
end;

function TEvent.Wait(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

procedure TEvent.Signal;
begin
SetEvent(FHandle);
end;

procedure TEvent.Reset;
begin
ResetEvent(FHandle);
end;

{ TSemaphore }
constructor TSemaphore.Create(const Name: string;
Initial,
Maxinum: Integer);
begin
FHandle := CreateSemaphore(nil, Initial, Maxinum, PChar(Name));
if FHandle = 0 then
abort;
end;

function TSemaphore.Get(TimeOut: Integer): Boolean;
begin
Result := WaitForSingleObject(FHandle, TimeOut) = WAIT_OBJECT_0;
end;

function TSemaphore.Release(Count: Integer): Boolean;
begin
Result := ReleaseSemaphore(Handle, Count, nil);
end;

function TSemaphore.Release(Count: Integer;
var PreviousCount: Integer): Boolean;
begin
Result := ReleaseSemaphore(Handle, Count, @PreviousCount);
end;

{ TFileMap }
constructor TFileMap.Create(const Name: string;
Size: Integer);
begin
try
FName := Name;
FSize := Size;
{ CreateFileMapping, when called with $FFFFFFFF for the hanlde value,
creates a region of shared memory }
FHandle := CreateFileMapping($FFFFFFFF, nil, PAGE_READWRITE, 0,
Size, PChar(Name));
if FHandle = 0 then
abort;
FCreated := GetLastError = 0;
{ We still need to map a pointer to the handle of the shared memory region }
FFileView := MapViewOfFile(FHandle, FILE_MAP_WRITE, 0, 0, Size);
if FFileView = nil then
abort;
except
Error(Format('创建内存映射文件失败, %s (%d)', [Name, GetLastError]));
end;
end;

destructor TFileMap.Destroy;
begin
if FFileView <> nil then
UnmapViewOfFile(FFileView);
inherited;
end;

constructor TFileMap.Open(const Name: string);
begin
try
FName := Name;
FSize := -1;
{ CreateFileMapping, when called with $FFFFFFFF for the hanlde value,
creates a region of shared memory }
FHandle := OpenFileMapping(0, True, PChar(Name));
if FHandle = 0 then
abort;
FCreated := GetLastError = 0;
{ We still need to map a pointer to the handle of the shared memory region }
FFileView := MapViewOfFile(FHandle, FILE_MAP_WRITE, 0, 0, Size);
if FFileView = nil then
abort;
except
Error(Format('创建内存映射文件失败, %s (%d)', [Name, GetLastError]));
end;
end;

{ TMemStream }
procedure TMemStream.Attach(PBuff: Pointer;
BuffSize: Integer);
begin
Clear;
SetPointer(PBuff, BuffSize);
Owned := False;
end;

procedure TMemStream.AttachOwnFree(PBuff: Pointer;
BuffSize: Integer);
begin
Clear;
SetPointer(PBuff, BuffSize);
Owned := True;
end;

procedure TMemStream.Clear;
begin
if Owned then
FreeMem(Memory);
end;

destructor TMemStream.Destroy;
begin
Clear;
inherited;
end;

procedure TMemStream.LoadFromStream(Stream: TStream);
var
Count: Longint;
begin
Stream.Position := 0;
Count := Size;
if Count <> 0 then
Stream.ReadBuffer(Memory^, Count);
end;

function TMemStream.ReadString(const Count: Integer): String;
var
ts: String;
Len: Integer;
begin
if Count > 0 then
begin
SetLength(ts, Count);
Len := Read(ts[1], Count);
Result := Copy(ts, 1, Len);
end else
Result := '';
end;

function TMemStream.Write(const Buffer;
Count: Integer): Longint;
var
Pos: Longint;
Num: Longint;
begin
if (Position >= 0) and (Count >= 0) then
begin
Pos := Position + Count;
if Pos > 0 then
begin
if Pos > Size then
Pos := Size;
Num := Pos - Position;
if Num > 0 then
begin
System.Move(Buffer, Pointer(Longint(Memory) + Position)^, Num);
Position := Pos;
Result := Num;
Exit;
end;
end;
end;
Result := 0;
end;

function TMemStream.WriteString(const S: String): Longint;
begin
if Length(S) > 0 then
Result := Write(S[1], Length(S))
else
Result := 0;
end;

end.
 
楼主的代码,和用不用多线程一个样,
多线程内部一般是一个死循环
如此:
while not Terminateddo
begin
{do
your things }
end;

如果使用了synchronize(Done);
的方法,调用Done,实际上是让主线程来做这些事情的
你的线程在等待主线程完成,这样和单线程没多大区别的

 
互斥,则可以用
TCriticalSection 对象
在Unit
SyncObjs 中定义
 
LiChaoHui,能不能说得更细些,我刚接触多线程,还望各位大侠多帮忙阿
 
procedure TRecvThread.Execute;
begin
{ Place thread code here }
FreeOnTerminate:=True;
if WaitForSingleObject(hMutex2,INFINITE)=WAIT_OBJECT_0 then
begin
synchronize(Done);
end;
ReleaseMutex(hMutex2);
end;
你的Execute中只不过有一次执行而已,应该用循环:
procedure TRecvThread.Execute;
begin
{ Place thread code here }
FreeOnTerminate:=True;
while not Terminateddo
begin
if WaitForSingleObject(hMutex2,INFINITE)=WAIT_OBJECT_0 then
begin
synchronize(Done);
end;
end;
ReleaseMutex(hMutex2);
end;

其他的正如LiChaoHui说的。
 
to DarwinZhang
现在是不停执行接收线程的Done事件了,可主线程的ClientSocketRead又只执行一次了?
 
注意,如果线程中需要调用VCL,则需通过线程的Synchronize方法,间接调用主线程
来执行你的代码,
如果要访问全局共享资源,在同一个进程中,用TCriticalSection可以获得较高的效率
进程之间则需要用TMutex对象,我上面的工具单元有这个对象
如果是共享一个缓冲区队列,则需要用到信号量来进行同步,
信号量就是为了解决这种问题而产生的,上面的工具单元中的类TSemaphore即是
这种问题属于典型的 生产者-消费者 问题,操作系统的教材中作了详细的介绍
用上面的对象 Semaphore 和 CriticalSection
Semaphore.创建时需指定初始值和最大值,
对于生产者来说,信号量计数代表空闲的缓冲区个数,如果为0,则线程应被阻塞
对于消费者来说,信号量计数代表可用数据的个数,如果为0,则线程应被阻塞
所以需要两个信号量配合起来才能工作,创建的参数如下:(假设缓冲区大小为20)
初始值为20,表示有20个空闲位置
Semaphore1 := TSemaphore.Create('buffer_semaphore_01', 20, 20);
初始值为0,表示有0个可用数据,如果刚开始就调用Get,因为没有数据,则线程被阻塞
直到生产者调用了Semaphore2.Release(1),此时,线程自动被唤醒,并且缓冲区中已有数据
Semaphore2 := TSemaphore.Create('buffer_semaphore_01', 0, 20);
生产者访问缓冲队列的次序为:
Semaphore1.Get(INFINITE);
//无限期等待
CriticalSection.Enter;
//此处往缓冲队列中加入数据
CriticalSection.Leave;
Semaphore2.Release(1);
消费者访问缓冲队列的次序为:
Semaphore2.Get(INFINITE);
//无限期等待
CriticalSection.Enter;
//此处从缓冲队列中取数据
CriticalSection.Leave;
Semaphore1.Release(1);
这样就完成了线程同步和互斥访问共享资源的任务,
如果不能理解,请找一本操作系统的教材,仔细地看一下进程的同步和互斥 相关的内容吧
如果找不到教材,则可以看一下,网上的教材
http://www.glcat.edu.cn/bmzy/dept2/czxt/3.htm
其中的 P 操作相当于Enter 或者 Get
V 操作相当于 Release 或者 Leave
他也使用了两个信号量用来同步 empty 和 full
还用了一个信号量来完成互斥 mutex
互斥量是信号量的特例, 就像整数是实数的特例一样
 
谢谢LiChaoHui,谢谢各位!真的了解些这方面的基础知识啊!可项目急啊
 
to LiChaoHui
能不能有个WaitForMultipleObjects,互斥,多线程的例子
接收,发送线程对RecvMem:array[1..MaxSize] of TRecvInfo类型的数组,怎么同步操作
 
to LiChaoHui
Semaphore1 := TSemaphore.Create('buffer_semaphore_01', 20, 20);
Semaphore2 := TSemaphore.Create('buffer_semaphore_01', 0, 20);
1。20是缓冲队列的字节数吗,怎样往缓冲队列中加入数据
2 我是用结构型数组来做处理收发数据,怎么和缓冲队列对应呢

 

Similar threads

后退
顶部