请求前辈给一个线程池的demo(100)

  • 主题发起人 lc4148700
  • 开始时间
L

lc4148700

Unregistered / Unconfirmed
GUEST, unregistred user!
代码:
unit uThreadPool;{   aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); }interfaceuses  Windows,  Classes;// 是否记录日志// {$DEFINE NOLOGS}type  TCriticalSection = class(TObject)  protected    FSection: TRTLCriticalSection;  public    constructor Create;    destructor Destroy; override;    // 进入临界区    procedure Enter;    // 离开临界区    procedure Leave;    // 尝试进入    function TryEnter: Boolean;  end;type  // 储存请求数据的基本类  TWorkItem = class(TObject)  public    // 是否有重复任务    function IsTheSame(DataObj: TWorkItem): Boolean; virtual;    // 如果 NOLOGS 被定义,则禁用。    function TextForLog: string; virtual;  end;type  TThreadsPool = class;  //线程状态  TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing,    tcsProcessed, tcsTerminating, tcsCheckingDown);  // 工作线程仅用于线程池内, 不要直接创建并调用它。  TProcessorThread = class(TThread)  private    // 创建线程时临时的Event对象, 阻塞线程直到初始化完成    hInitFinished: THandle;    // 初始化出错信息    sInitError: string;    // 记录日志    procedure WriteLog(const Str: string; Level: Integer = 0);  protected    // 线程临界区同步对像    csProcessingDataObject: TCriticalSection;    // 平均处理时间    FAverageProcessing: Integer;    // 等待请求的平均时间    FAverageWaitingTime: Integer;    // 本线程实例的运行状态    FCurState: TThreadState;    // 本线程实例所附属的线程池    FPool: TThreadsPool;    // 当前处理的数据对像。    FProcessingDataObject: TWorkItem;    // 线程停止 Event, TProcessorThread.Terminate 中开绿灯    hThreadTerminated: THandle;    uProcessingStart: DWORD;    // 开始等待的时间, 通过 GetTickCount 取得。    uWaitingStart: DWORD;    // 计算平均工作时间    function AverageProcessingTime: DWORD;    // 计算平均等待时间    function AverageWaitingTime: DWORD;    procedure Execute; override;    function IamCurrentlyProcess(DataObj: TWorkItem): Boolean;    // 转换枚举类型的线程状态为字串类型    function InfoText: string;    // 线程是否长时间处理同一个请求?(已死掉?)    function IsDead: Boolean;    // 线程是否已完成当成任务    function isFinished: Boolean;    // 线程是否处于空闲状态    function isIdle: Boolean;    // 平均值校正计算。    function NewAverage(OldAvg, NewVal: Integer): Integer;  public    Tag: Integer;    constructor Create(APool: TThreadsPool);    destructor Destroy; override;    procedure Terminate;  end;  // 线程初始化时触发的事件  TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread:    TProcessorThread) of object;  // 线程结束时触发的事件  TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread:    TProcessorThread) of object;  // 线程处理请求时触发的事件  TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem;    aThread: TProcessorThread) of object;  TEmptyKind = (    ekQueueEmpty, //任务被取空后    ekProcessingFinished // 最后一个任务处理完毕后    );  // 任务队列空时触发的事件  TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of    object;  TThreadsPool = class(TComponent)  private    csQueueManagment: TCriticalSection;    csThreadManagment: TCriticalSection;    FProcessRequest: TProcessRequest;    FQueue: TList;    FQueueEmpty: TQueueEmpty;    // 线程超时阀值    FThreadDeadTimeout: DWORD;    FThreadFinalizing: TProcessorThreadFinalizing;    FThreadInitializing: TProcessorThreadInitializing;    // 工作中的线程    FThreads: TList;    // 执行了 terminat 发送退出指令, 正在结束的线程.    FThreadsKilling: TList;    // 最少, 最大线程数    FThreadsMax: Integer;    // 最少, 最大线程数    FThreadsMin: Integer;    // 池平均等待时间    function PoolAverageWaitingTime: Integer;    procedure WriteLog(const Str: string; Level: Integer = 0);  protected    FLastGetPoint: Integer;    // Semaphore, 统计任务队列    hSemRequestCount: THandle;    // Waitable timer. 每30触发一次的时间量同步    hTimCheckPoolDown: THandle;    // 线程池停机(检查并清除空闲线程和死线程)    procedure CheckPoolDown;    // 清除死线程,并补充不足的工作线程    procedure CheckThreadsForGrow;    procedure DoProcessed;    procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread);      virtual;    procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;    procedure DoThreadFinalizing(aThread: TProcessorThread); virtual;    // 执行事件    procedure DoThreadInitializing(aThread: TProcessorThread); virtual;    // 释放 FThreadsKilling 列表中的线程    procedure FreeFinishedThreads;    // 申请任务    procedure GetRequest(out Request: TWorkItem);    // 清除死线程    procedure KillDeadThreads;  public    constructor Create(AOwner: TComponent); override;    destructor Destroy; override;    // 就进行任务是否重复的检查, 检查发现重复就返回 False    function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean =      False): Boolean; overload;    // 转换枚举类型的线程状态为字串类型    function InfoText: string;  published    // 线程处理任务时触发的事件    property OnProcessRequest: TProcessRequest read FProcessRequest write      FProcessRequest;    // 任务列表为空时解发的事件    property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;    // 线程结束时触发的事件    property OnThreadFinalizing: TProcessorThreadFinalizing read      FThreadFinalizing write FThreadFinalizing;    // 线程初始化时触发的事件    property OnThreadInitializing: TProcessorThreadInitializing read      FThreadInitializing write FThreadInitializing;    // 线程超时值(毫秒), 如果处理超时,将视为死线程    property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write      FThreadDeadTimeout default 0;    // 最大线程数    property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1;    // 最小线程数    property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0;  end;type  //日志记志函数  TLogWriteProc = procedure(    const Str: string; //日志    LogID: Integer = 0;    Level: Integer = 0 //Level = 0 - 跟踪信息, 10 - 致命错误    );var  WriteLog: TLogWriteProc; // 如果存在实例就写日志implementationuses  SysUtils;// 储存请求数据的基本类{********************************** TWorkItem ***********************************}function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean;begin  Result := False;end; { TWorkItem.IsTheSame }function TWorkItem.TextForLog: string;begin  Result := 'Request';end; { TWorkItem.TextForLog }{********************************* TThreadsPool *********************************}constructor TThreadsPool.Create(AOwner: TComponent);var  DueTo: Int64;begin{$IFNDEF NOLOGS}  WriteLog('创建线程池', 5);{$ENDIF}  inherited;  csQueueManagment := TCriticalSection.Create;  FQueue := TList.Create;  csThreadManagment := TCriticalSection.Create;  FThreads := TList.Create;  FThreadsKilling := TList.Create;  FThreadsMin := 0;  FThreadsMax := 1;  FThreadDeadTimeout := 0;  FLastGetPoint := 0;  //  hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);  DueTo := -1;  //可等待的定时器(只用于Window NT4或更高)  hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil);  if hTimCheckPoolDown = 0 then // Win9x不支持    // In Win9x number of thread will be never decrised    hTimCheckPoolDown := CreateEvent(nil, False, False, nil)  else    SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False);end; { TThreadsPool.Create }destructor TThreadsPool.Destroy;var  n, i: Integer;  Handles: array of THandle;begin{$IFNDEF NOLOGS}  WriteLog('线程池销毁', 5);{$ENDIF}  csThreadManagment.Enter;  SetLength(Handles, FThreads.Count);  n := 0;  for i := 0 to FThreads.Count - 1 do    if FThreads[i] <> nil then    begin      Handles[n] := TProcessorThread(FThreads[i]).Handle;      TProcessorThread(FThreads[i]).Terminate;      Inc(n);    end;  csThreadManagment.Leave;  // lixiaoyu 添加于 2009.1.6,如没有此行代码无法成功释放正在执行中的工作者线程,死锁。  WaitForMultipleObjects(n, @Handles[0], True, 30000);  // 等待工作者线程执行终止  lixiaoyu 注释于 2009.1.6  csThreadManagment.Enter;  // lixiaoyu 添加于 2009.1.6 再次进入锁定,并释放资源  for i := 0 to FThreads.Count - 1 do    TProcessorThread(FThreads[i]).Free;  FThreads.Free;  FThreadsKilling.Free;  csThreadManagment.Free;  csQueueManagment.Enter;  for i := FQueue.Count - 1 downto 0 do    TObject(FQueue[i]).Free;  FQueue.Free;  csQueueManagment.Free;  CloseHandle(hSemRequestCount);  CloseHandle(hTimCheckPoolDown);  inherited;end; { TThreadsPool.Destroy }function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles:  Boolean = False): Boolean;var  i: Integer;begin{$IFNDEF NOLOGS}  WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2);{$ENDIF}  Result := False;  csQueueManagment.Enter;  try    // 如果 CheckForDoubles = TRUE    // 则进行任务是否重复的检查    if CheckForDoubles then      for i := 0 to FQueue.Count - 1 do        if (FQueue[i] <> nil)          and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then          Exit; // 发现有相同的任务    csThreadManagment.Enter;    try      // 清除死线程,并补充不足的工作线程      CheckThreadsForGrow;      // 如果 CheckForDoubles = TRUE      // 则检查是否有相同的任务正在处理中      if CheckForDoubles then        for i := 0 to FThreads.Count - 1 do          if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then            Exit; // 发现有相同的任务    finally      csThreadManagment.Leave;    end;    //将任务加入队列    FQueue.Add(aDataObject);    //释放一个同步信号量    ReleaseSemaphore(hSemRequestCount, 1, nil);{$IFNDEF NOLOGS}    WriteLog('释放一个同步信号量)', 1);{$ENDIF}    Result := True;  finally    csQueueManagment.Leave;  end;{$IFNDEF NOLOGS}  //调试信息  WriteLog('增加一个任务(' + aDataObject.TextForLog + ')', 1);{$ENDIF}end; { TThreadsPool.AddRequest }{函 数 名:TThreadsPool.CheckPoolDown功能描述:线程池停机(检查并清除空闲线程和死线程)输入参数:无返 回 值: 无创建日期:2006.10.22 11:31修改日期:2006.作    者:Kook附加说明:}procedure TThreadsPool.CheckPoolDown;var  i: Integer;begin{$IFNDEF NOLOGS}  WriteLog('TThreadsPool.CheckPoolDown', 1);{$ENDIF}  csThreadManagment.Enter;  try{$IFNDEF NOLOGS}    WriteLog(InfoText, 2);{$ENDIF}    // 清除死线程    KillDeadThreads;    // 释放 FThreadsKilling 列表中的线程    FreeFinishedThreads;    // 如果线程空闲,就终止它    for i := FThreads.Count - 1 downto FThreadsMin do      if TProcessorThread(FThreads[i]).isIdle then      begin        //发出终止命令        TProcessorThread(FThreads[i]).Terminate;        //加入待清除队列        FThreadsKilling.Add(FThreads[i]);        //从工作队列中除名        FThreads.Delete(i);        //todo: ??        Break;      end;  finally    csThreadManagment.Leave;  end;end; { TThreadsPool.CheckPoolDown }{函 数 名:TThreadsPool.CheckThreadsForGrow功能描述:清除死线程,并补充不足的工作线程输入参数:无返 回 值: 无创建日期:2006.10.22 11:31修改日期:2006.作    者:Kook附加说明:}procedure TThreadsPool.CheckThreadsForGrow;var  AvgWait: Integer;  i: Integer;begin  {    New thread created if:    新建线程的条件:      1. 工作线程数小于最小线程数      2. 工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms(系统忙)      3. 任务大于工作线程数的4倍  }  csThreadManagment.Enter;  try    KillDeadThreads;    if FThreads.Count < FThreadsMin then    begin{$IFNDEF NOLOGS}      WriteLog('工作线程数小于最小线程数', 4);{$ENDIF}      for i := FThreads.Count to FThreadsMin - 1 do      try        FThreads.Add(TProcessorThread.Create(Self));      except        on e: Exception do          WriteLog(            'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: '            + e.Message,            9            );      end    end    else if FThreads.Count < FThreadsMax then    begin{$IFNDEF NOLOGS}      WriteLog('工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms', 3);{$ENDIF}      AvgWait := PoolAverageWaitingTime;{$IFNDEF NOLOGS}      WriteLog(Format(        'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d',        [FThreads.Count, FThreadsMax, AvgWait]),        4        );{$ENDIF}      if AvgWait < 100 then      try        FThreads.Add(TProcessorThread.Create(Self));      except        on e: Exception do          WriteLog(            'TProcessorThread.Create raise: ' + e.ClassName +            #13#10#9'Message: ' + e.Message,            9            );      end;    end;  finally    csThreadManagment.Leave;  end;end; { TThreadsPool.CheckThreadsForGrow }procedure TThreadsPool.DoProcessed;var  i: Integer;begin  if (FLastGetPoint < FQueue.Count) then    Exit;  csThreadManagment.Enter;  try    for i := 0 to FThreads.Count - 1 do      if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then        Exit;  finally    csThreadManagment.Leave;  end;  DoQueueEmpty(ekProcessingFinished);end; { TThreadsPool.DoProcessed }procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread:  TProcessorThread);begin  if Assigned(FProcessRequest) then    FProcessRequest(Self, aDataObj, aThread);end; { TThreadsPool.DoProcessRequest }procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind);begin  if Assigned(FQueueEmpty) then    FQueueEmpty(Self, EmptyKind);end; { TThreadsPool.DoQueueEmpty }procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread);begin  if Assigned(FThreadFinalizing) then    FThreadFinalizing(Self, aThread);end; { TThreadsPool.DoThreadFinalizing }procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread);begin  if Assigned(FThreadInitializing) then    FThreadInitializing(Self, aThread);end; { TThreadsPool.DoThreadInitializing }{函 数 名:TThreadsPool.FreeFinishedThreads功能描述:释放 FThreadsKilling 列表中的线程输入参数:无返 回 值: 无创建日期:2006.10.22 11:34修改日期:2006.作    者:Kook附加说明:}procedure TThreadsPool.FreeFinishedThreads;var  i: Integer;begin  if csThreadManagment.TryEnter then  try    for i := FThreadsKilling.Count - 1 downto 0 do      if TProcessorThread(FThreadsKilling[i]).isFinished then      begin        TProcessorThread(FThreadsKilling[i]).Free;        FThreadsKilling.Delete(i);      end;  finally    csThreadManagment.Leave  end;end; { TThreadsPool.FreeFinishedThreads }{函 数 名:TThreadsPool.GetRequest功能描述:申请任务输入参数:out Request: TRequestDataObject返 回 值: 无创建日期:2006.10.22 11:34修改日期:2006.作    者:Kook附加说明:}procedure TThreadsPool.GetRequest(out Request: TWorkItem);begin{$IFNDEF NOLOGS}  WriteLog('申请任务', 2);{$ENDIF}  csQueueManagment.Enter;  try    //跳过空的队列元素    while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do      Inc(FLastGetPoint);    Assert(FLastGetPoint < FQueue.Count);    //压缩队列,清除空元素    if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then    begin{$IFNDEF NOLOGS}      WriteLog('FQueue.Pack', 1);{$ENDIF}      FQueue.Pack;      FLastGetPoint := 0;    end;    Request := TWorkItem(FQueue[FLastGetPoint]);    FQueue[FLastGetPoint] := nil;    inc(FLastGetPoint);    if (FLastGetPoint = FQueue.Count) then //如果队列中无任务    begin      DoQueueEmpty(ekQueueEmpty);      FQueue.Clear;      FLastGetPoint := 0;    end;  finally    csQueueManagment.Leave;  end;end; { TThreadsPool.GetRequest }function TThreadsPool.InfoText: string;begin  Result := '';  //end;  //{$ELSE}  //var  //  i: Integer;  //begin  //  csQueueManagment.Enter;  //  csThreadManagment.Enter;  //  try  //    if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and  //      TProcessorThread(FThreadsKilling[0]).isFinished then  //      FreeFinishedThreads;  //  //    Result := Format(  //      'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10,  //      [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count,  //      FQueue.Count]  //        );  //    if FThreads.Count > 0 then  //      Result := Result + 'Working threads:'#13#10;  //    for i := 0 to FThreads.Count - 1 do  //      Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10;  //    if FThreadsKilling.Count > 0 then  //      Result := Result + 'Terminated threads:'#13#10;  //    for i := 0 to FThreadsKilling.Count - 1 do  //      Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10;  //  finally  //    csThreadManagment.Leave;  //    csQueueManagment.Leave;  //  end;  //end;  //{$ENDIF}end; { TThreadsPool.InfoText }{函 数 名:TThreadsPool.KillDeadThreads功能描述:清除死线程输入参数:无返 回 值: 无创建日期:2006.10.22 11:32修改日期:2006.作    者:Kook附加说明:}procedure TThreadsPool.KillDeadThreads;var  i: Integer;begin  // Check for dead threads  if csThreadManagment.TryEnter then  try    for i := 0 to FThreads.Count - 1 do      if TProcessorThread(FThreads[i]).IsDead then      begin        // Dead thread moverd to other list.        // New thread created to replace dead one        TProcessorThread(FThreads[i]).Terminate;        FThreadsKilling.Add(FThreads[i]);        try          FThreads[i] := TProcessorThread.Create(Self);        except          on e: Exception do          begin            FThreads[i] := nil;{$IFNDEF NOLOGS}            WriteLog(              'TProcessorThread.Create raise: ' + e.ClassName +              #13#10#9'Message: ' + e.Message,              9              );{$ENDIF}          end;        end;      end;  finally    csThreadManagment.Leave  end;end; { TThreadsPool.KillDeadThreads }function TThreadsPool.PoolAverageWaitingTime: Integer;var  i: Integer;begin  Result := 0;  if FThreads.Count > 0 then  begin    for i := 0 to FThreads.Count - 1 do      Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime);    Result := Result div FThreads.Count  end  else    Result := 1;end; { TThreadsPool.PoolAverageWaitingTime }procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0);begin{$IFNDEF NOLOGS}  uThreadPool.WriteLog(Str, 0, Level);{$ENDIF}end; { TThreadsPool.WriteLog }// 工作线程仅用于线程池内, 不要直接创建并调用它。{******************************* TProcessorThread *******************************}constructor TProcessorThread.Create(APool: TThreadsPool);begin  WriteLog('创建工作线程', 5);  inherited Create(True);  FPool := aPool;  FAverageWaitingTime := 1000;  FAverageProcessing := 3000;  sInitError := '';  {  各参数的意义如下:      参数一:填上 nil 即可。   参数二:是否采用手动调整灯号。   参数三:灯号的起始状态,False 表示红灯。   参数四:Event 名称, 对象名称相同的话,会指向同一个对象,所以想要有两个Event对象,便要有两个不同的名称(这名称以字符串来存.为NIL的话系统每次会自己创建一个不同的名字,就是被次创建的都是新的EVENT)。   传回值:Event handle。  }  hInitFinished := CreateEvent(nil, True, False, nil);  hThreadTerminated := CreateEvent(nil, True, False, nil);  csProcessingDataObject := TCriticalSection.Create;  try    WriteLog('TProcessorThread.Create::Resume', 3);    Resume;    //阻塞, 等待初始化完成    WaitForSingleObject(hInitFinished, INFINITE);    if sInitError <> '' then      raise Exception.Create(sInitError);  finally    CloseHandle(hInitFinished);  end;  WriteLog('TProcessorThread.Create::Finished', 3);end; { TProcessorThread.Create }destructor TProcessorThread.Destroy;begin  WriteLog('工作线程销毁', 5);  CloseHandle(hThreadTerminated);  csProcessingDataObject.Free;  inherited;end; { TProcessorThread.Destroy }function TProcessorThread.AverageProcessingTime: DWORD;begin  if (FCurState in [tcsProcessing]) then    Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart)  else    Result := FAverageProcessingend; { TProcessorThread.AverageProcessingTime }function TProcessorThread.AverageWaitingTime: DWORD;begin  if (FCurState in [tcsWaiting, tcsCheckingDown]) then    Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart)  else    Result := FAverageWaitingTimeend; { TProcessorThread.AverageWaitingTime }procedure TProcessorThread.Execute;type  THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown);var  WaitedTime: Integer;  Handles: array[THandleID] of THandle;begin  WriteLog('工作线程进常运行', 3);  //当前状态:初始化  FCurState := tcsInitializing;  try    //执行外部事件    FPool.DoThreadInitializing(Self);  except    on e: Exception do      sInitError := e.Message;  end;  //初始化完成,初始化Event绿灯  SetEvent(hInitFinished);  WriteLog('TProcessorThread.Execute::Initialized', 3);  //引用线程池的同步 Event  Handles[hidTerminateThread] := hThreadTerminated;  Handles[hidRequest] := FPool.hSemRequestCount;  Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown;  //时间戳,  //todo: 好像在线程中用 GetTickCount; 会不正常  uWaitingStart := GetTickCount;  //任务置空  FProcessingDataObject := nil;  //大巡环  while not terminated do  begin    //当前状态:等待    FCurState := tcsWaiting;    //阻塞线程,使线程休眠    case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) -      WAIT_OBJECT_0 of      WAIT_OBJECT_0 + ord(hidTerminateThread):        begin          WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5);          //当前状态:正在终止线程          FCurState := tcsTerminating;          //退出大巡环(结束线程)          Break;        end;      WAIT_OBJECT_0 + ord(hidRequest):        begin          WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3);          //等待的时间          WaitedTime := GetTickCount - uWaitingStart;          //重新计算平均等待时间          FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);          //当前状态:申请任务          FCurState := tcsGetting;          //如果等待时间过短,则检查工作线程是否足够          if WaitedTime < 5 then            FPool.CheckThreadsForGrow;          //从线程池的任务队列中得到任务          FPool.GetRequest(FProcessingDataObject);          //开始处理的时间戳          uProcessingStart := GetTickCount;          //当前状态:执行任务          FCurState := tcsProcessing;          try{$IFNDEF NOLOGS}            WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2);{$ENDIF}            //执行任务            FPool.DoProcessRequest(FProcessingDataObject, Self);          except            on e: Exception do              WriteLog(                'OnProcessRequest for ' + FProcessingDataObject.TextForLog +                #13#10'raise Exception: ' + e.Message,                8                );          end;          //释放任务对象          csProcessingDataObject.Enter;          try            FProcessingDataObject.Free;            FProcessingDataObject := nil;          finally            csProcessingDataObject.Leave;          end;          //重新计算          FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount -            uProcessingStart);          //当前状态:执行任务完毕          FCurState := tcsProcessed;          //执行线程外事件          FPool.DoProcessed;          uWaitingStart := GetTickCount;        end;      WAIT_OBJECT_0 + ord(hidCheckPoolDown):        begin          // !!! Never called under Win9x          WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ',            4);          //当前状态:线程池停机(检查并清除空闲线程和死线程)          FCurState := tcsCheckingDown;          FPool.CheckPoolDown;        end;    end;  end;  FCurState := tcsTerminating;  FPool.DoThreadFinalizing(Self);end; { TProcessorThread.Execute }function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean;begin  csProcessingDataObject.Enter;  try    Result := (FProcessingDataObject <> nil) and      DataObj.IsTheSame(FProcessingDataObject);  finally    csProcessingDataObject.Leave;  end;end; { TProcessorThread.IamCurrentlyProcess }function TProcessorThread.InfoText: string;const  ThreadStateNames: array[TThreadState] of string =  (    'tcsInitializing',    'tcsWaiting',    'tcsGetting',    'tcsProcessing',    'tcsProcessed',    'tcsTerminating',    'tcsCheckingDown'    );begin{$IFNDEF NOLOGS}  Result := Format(    '%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d',    [ThreadID, ThreadStateNames[FCurState], AverageWaitingTime,    AverageProcessingTime]      );  case FCurState of    tcsWaiting:      Result := Result + ', WaitingTime=' + IntToStr(GetTickCount -        uWaitingStart);    tcsProcessing:      Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount -        uProcessingStart);  end;  csProcessingDataObject.Enter;  try    if FProcessingDataObject <> nil then      Result := Result + ' ' + FProcessingDataObject.TextForLog;  finally    csProcessingDataObject.Leave;  end;{$ENDIF}end; { TProcessorThread.InfoText }function TProcessorThread.IsDead: Boolean;begin  Result :=    Terminated or    (FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and    (GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout);  if Result then    WriteLog('Thread dead', 5);end; { TProcessorThread.IsDead }function TProcessorThread.isFinished: Boolean;begin  Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;end; { TProcessorThread.isFinished }function TProcessorThread.isIdle: Boolean;begin  // 如果线程状态是 tcsWaiting, tcsCheckingDown  // 并且 空间时间 > 100ms,  // 并且 平均等候任务时间大于平均工作时间的 50%  // 则视为空闲。  Result :=    (FCurState in [tcsWaiting, tcsCheckingDown]) and    (AverageWaitingTime > 100) and    (AverageWaitingTime * 2 > AverageProcessingTime);end; { TProcessorThread.isIdle }function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer;begin  Result := (OldAvg * 2 + NewVal) div 3;end; { TProcessorThread.NewAverage }procedure TProcessorThread.Terminate;begin  WriteLog('TProcessorThread.Terminate', 5);  inherited Terminate;  SetEvent(hThreadTerminated);end; { TProcessorThread.Terminate }procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0);begin{$IFNDEF NOLOGS}  uThreadPool.WriteLog(Str, ThreadID, Level);{$ENDIF}end; { TProcessorThread.WriteLog }{******************************* TCriticalSection *******************************}constructor TCriticalSection.Create;begin  InitializeCriticalSection(FSection);end; { TCriticalSection.Create }destructor TCriticalSection.Destroy;begin  DeleteCriticalSection(FSection);end; { TCriticalSection.Destroy }procedure TCriticalSection.Enter;begin  EnterCriticalSection(FSection);end; { TCriticalSection.Enter }procedure TCriticalSection.Leave;begin  LeaveCriticalSection(FSection);end; { TCriticalSection.Leave }function TCriticalSection.TryEnter: Boolean;begin  Result := TryEnterCriticalSection(FSection);end; { TCriticalSection.TryEnter }procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0);beginend;initialization  WriteLog := NoLogs;end.
. 用法:// 创建线程池FThreadPool := TThreadsPool.Create(Self); // 创建线程池FThreadPool.ThreadsMin := 5; // 初始工作线程数FThreadPool.ThreadsMax := 50; // 最大允许工作线程数FThreadPool.OnProcessRequest := DealwithCommRecvData; // 线程工作函数(DealwithCommRecvData在工作者线程的Execute方法中被调用)// 使用线程池var AWorkItem: TRecvCommDataWorkItem; // 继承自TWorkItembegin AWorkItem := TRecvCommDataWorkItem.Create; Move(PData[0], AWorkItem.FRecvData[0], PDataLen); AWorkItem.FRecvDataLen := PDataLen; FThreadPool.AddRequest(AWorkItem); // 向线程池分配一个任务end; 虽然已经很详细了,可是我还是不会应用在程序中,请前辈抽出5分钟时间给我做个最简单的demo哪怕就是在memo中显示数字这种,只要能看到它能正常调用即可!万分感谢!
 
D

de410

Unregistered / Unconfirmed
GUEST, unregistred user!
线程池demohttp://www.2ccc.com/article.asp?articleid=3721
 
L

lc4148700

Unregistered / Unconfirmed
GUEST, unregistred user!
这个我看不懂啊!有没有利用上面我给的单元来调用的demo!
 
Z

znxia

Unregistered / Unconfirmed
GUEST, unregistred user!
楼主,估计没那么多人有足够的时间来看完你的代码啊。算是帮你顶吧。
 
W

wql

Unregistered / Unconfirmed
GUEST, unregistred user!
查离线数据库啊!
 

暗夜中独舞

Unregistered / Unconfirmed
GUEST, unregistred user!
上面的例子不是已经有调用的吗?楼主到底想要什么?
 
Y

YYGGQQ

Unregistered / Unconfirmed
GUEST, unregistred user!
www.itstudy.net 有你想要的
 
K

kk2000

Unregistered / Unconfirmed
GUEST, unregistred user!
这个最好自己动手!
 
L

lc4148700

Unregistered / Unconfirmed
GUEST, unregistred user!
我说简单一点,就是这个单元http://www.abcxd.com/delphi/abcxddelphi/delphiJQSQ/309.html想利用这个提供的调用,来完成一个简单的线程池实例!
 
K

kk2000

Unregistered / Unconfirmed
GUEST, unregistred user!
过今天给你一个调用样列! 没有什么难做的
 
D

dorry

Unregistered / Unconfirmed
GUEST, unregistred user!
unit Unit1;interfaceuses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls, ComCtrls,uThreadPool;type TForm1 = class(TForm) Button1: TButton; ProgressBar1: TProgressBar; Button2: TButton; Button3: TButton; Button4: TButton; Button5: TButton; Button6: TButton; Button7: TButton; Button8: TButton; ProgressBar2: TProgressBar; ProgressBar3: TProgressBar; Button9: TButton; Button10: TButton; procedure Button1Click(Sender: TObject); procedure Button2Click(Sender: TObject); procedure Button3Click(Sender: TObject); procedure Button4Click(Sender: TObject); procedure Button5Click(Sender: TObject); procedure Button6Click(Sender: TObject); procedure Button7Click(Sender: TObject); procedure FormCreate(Sender: TObject); procedure Button8Click(Sender: TObject); procedure Button9Click(Sender: TObject); procedure Button10Click(Sender: TObject); private { Private declarations } procedure DoMyWork(Sender : TThreadsPool; WorkItem : TWorkItem; aThread : TProcessorThread); public { Public declarations } ThreadsPool : TThreadsPool; Lock : TCriticalSection; end;var Form1: TForm1;implementationuses sevenzip;{$R *.dfm}type TMyWork1=class(TWorkItem) constructor Create; destructor Destroy; override; end; TMyWork2=class(TWorkItem) constructor Create; destructor Destroy; override; end; TMyWork3=class(TWorkItem) constructor Create; destructor Destroy; override; end;function ProgressCallback(sender: Pointer; total: boolean; value: int64): HRESULT; stdcall;begin if total then Form1.ProgressBar1.Max := value else Form1.ProgressBar1.Position := value; Result := S_OK;end;function ProgressCallback1(sender: Pointer; total: boolean; value: int64): HRESULT; stdcall;begin if total then Form1.ProgressBar2.Max := value else Form1.ProgressBar2.Position := value; Result := S_OK;end;function ProgressCallback2(sender: Pointer; total: boolean; value: int64): HRESULT; stdcall;begin if total then Form1.ProgressBar3.Max := value else Form1.ProgressBar3.Position := value; Result := S_OK;end;//解压文件夹测试procedure TForm1.Button10Click(Sender: TObject);begin ThreadsPool.AddRequest(TMyWork3.Create,true);end;procedure TForm1.Button1Click(Sender: TObject);begin with CreateInArchive(CLSID_CFormatZip) do begin OpenFile('C:/MyFirstCompont/ZlibCompress/delphi实现7Z格式压缩zip/demo/testzip/43680522Fee.zip'); SetProgressCallback(nil, ProgressCallback); ExtractTo('C:/MyFirstCompont/ZlibCompress/delphi实现7Z格式压缩zip/demo/testzip/43680522Fee'); end;end;function PasswordCallback(sender: Pointer; var password: WideString): HRESULT; stdcall;begin // call a dialog box ... password := 'password'; Result := S_OK;end;procedure TForm1.Button2Click(Sender: TObject);begin with CreateInArchive(CLSID_CFormatZip) do begin // using callback SetPasswordCallback(nil, PasswordCallback); // or setting password directly SetPassword('password'); OpenFile('c:/test.zip');end;end; //压缩文件procedure TForm1.Button3Click(Sender: TObject);var Arch: I7zOutArchive; aStream:TmemoryStream;begin aStream:=TmemoryStream.Create;try Arch := CreateOutArchive(CLSID_CFormat7z); Arch.AddFile('C:/7Zip线程压缩测试demo/MYNOTE.FBK', 'MYNOTE.FBK');//文件名 ,压缩文档内的路径/文件名 SetCompressionLevel(Arch, 5); SevenZipSetCompressionMethod(Arch, m7LZMA); Arch.SetProgressCallback(nil, ProgressCallback1); Arch.SaveToFile('C:/7Zip线程压缩测试demo/MYNOTE.7z'); finally aStream.Free;end;end;//压缩文件夹procedure TForm1.Button4Click(Sender: TObject);var Arch: I7zOutArchive; aStream:TmemoryStream;begin aStream:=TmemoryStream.Create; try Arch := CreateOutArchive(CLSID_CFormat7z); Arch.AddFiles('c:/test', 'test', '*.*', true); SetCompressionLevel(Arch, 4); SevenZipSetCompressionMethod(Arch, m7LZMA); Arch.SetProgressCallback(nil, ProgressCallback2); Arch.SaveToFile('c:/test.7z'); //Arch.SaveToStream(aStream); //aStream.SaveToFile('c:/test.7z'); finally aStream.Free; end;end;//加文件到流procedure TForm1.Button5Click(Sender: TObject);var Arch: I7zOutArchive; aStream:TmemoryStream;begin aStream:=TmemoryStream.Create; Arch := CreateOutArchive(CLSID_CFormat7z); Arch.AddFile('c:/test/T2030000.mdb', 'T2030000.mdb');//FileName:文件名 ,Path:压缩文档内的路径/文件名 //Arch.AddStream(aStream, soReference, faArchive, CurrentFileTime, CurrentFileTime, // 'T2030000.mdb', false, false); //Path:压缩文档内的路径/文件名 SetCompressionLevel(Arch, 5); SevenZipSetCompressionMethod(Arch, m7LZMA); Arch.SetProgressCallback(nil, ProgressCallback); //Arch.SaveToFile('c:/T2030000d.7z'); Arch.SaveToStream(aStream); aStream.SaveToFile('c:/T2030000.7z'); aStream.Free;end;//从流中回复文件procedure TForm1.Button6Click(Sender: TObject);var Arch: I7zOutArchive; aStream:TMemoryStream;begin aStream:=TMemoryStream.Create;try Arch := CreateOutArchive(CLSID_CFormat7z); Arch.AddFile('c:/test/T2030000.mdb', 'T2030000.mdb');//FileName:文件名 ,Path:压缩文档内的路径/文件名 //Arch.AddStream(aStream, soReference, faArchive, CurrentFileTime, CurrentFileTime, // 'T2030000.mdb', false, false); //Path:压缩文档内的路径/文件名 //AddStream 方法不需要使用! SetCompressionLevel(Arch, 4); SevenZipSetCompressionMethod(Arch, m7LZMA); Arch.SetProgressCallback(nil, ProgressCallback); Arch.SaveToStream(aStream); try aStream.Position:=0; with CreateInArchive(CLSID_CFormat7z) do begin OpenStream(T7zStream.Create(aStream, soOwned)); Arch.SetProgressCallback(nil, ProgressCallback); ExtractTo('c:/123'); end; except exit; end; //aStream.SaveToFile('c:/T2030000d.mdb');finally //aStream.Free; 内部已经释放end; end;//加文件夹到流procedure TForm1.Button7Click(Sender: TObject);var Arch: I7zOutArchive; aStream:TmemoryStream;begin aStream:=TmemoryStream.Create; try Arch := CreateOutArchive(CLSID_CFormat7z); Arch.AddFiles('c:/test', 'test', '*.*', true); //AddStream 方法不需要使用! SetCompressionLevel(Arch, 4); SevenZipSetCompressionMethod(Arch, m7LZMA); Arch.SetProgressCallback(nil, ProgressCallback); Arch.SaveToStream(aStream); aStream.SaveToFile('c:/test.7z'); finally aStream.Free; end; end;procedure TForm1.Button8Click(Sender: TObject);begin ThreadsPool.AddRequest(TMyWork1.Create,true);end;procedure TForm1.Button9Click(Sender: TObject);begin ThreadsPool.AddRequest(TMyWork2.Create,true);end;procedure TForm1.DoMyWork(Sender: TThreadsPool; WorkItem: TWorkItem; aThread: TProcessorThread);var i,j : integer; Rst: integer;begin if WorkItem.FI=1 then Button7Click(Sender); if WorkItem.FI=2 then Button3Click(Sender); if WorkItem.FI=3 then Button4Click(Sender);end;procedure TForm1.FormCreate(Sender: TObject);begin Lock := TCriticalSection.Create; ThreadsPool := TThreadsPool.Create(Self); ThreadsPool.OnProcessRequest := DoMyWork; ThreadsPool.ThreadsMin:=5; ThreadsPool.ThreadsMax := 20;end;{ TMyWork1 }constructor TMyWork1.Create;begin FI:=1;end;destructor TMyWork1.Destroy;begin inherited;end;{ TMyWork2 }constructor TMyWork2.Create;begin FI:=2;end;destructor TMyWork2.Destroy;begin inherited;end;{ TMyWork3 }constructor TMyWork3.Create;begin FI:=3;end;destructor TMyWork3.Destroy;begin inherited;end;end.
 
K

kk2000

Unregistered / Unconfirmed
GUEST, unregistred user!
:) (:
 

Similar threads

顶部